700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Netty 全网最详细的教程! 没有之一!

Netty 全网最详细的教程! 没有之一!

时间:2024-08-02 18:12:36

相关推荐

Netty 全网最详细的教程! 没有之一!

Netty 入门

文章目录

Netty 入门入门案例1、服务器端代码2、客户端代码3、运行流程组件解释使用组件EventLoop创建一个EventLoopServer创建一个Client细分 EventLoopGroupChannelChannel 的常用方法带有Future, Promise的类型都是和异步方法配套使用, 用来处理结果获取关闭后的Channel, 用来执行善后工作Future & Promise概念测试Jdk的Future测试Netty的FUture测试Netty的PromiseHandler & Pipeline编写服务器使用EmbeddedChannel进行测试ByteBufdemo直接内存与堆内存池化与非池化组成写入扩容扩容规则读取retain & release释放规则切片Composite优势

入门案例

1、服务器端代码

public class HelloServer {public static void main(String[] args) {// 1、启动器,负责装配netty组件,启动服务器new ServerBootstrap()// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector.group(new NioEventLoopGroup())// 3、选择服务器的 ServerSocketChannel 实现.channel(NioServerSocketChannel.class)// 4、child 负责处理读写,该方法决定了 child 执行哪些操作// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>StringnioSocketChannel.pipeline().addLast(new StringDecoder());// 6、SocketChannel的业务处理,使用上一个处理器的处理结果nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s);}});}// 7、ServerSocketChannel绑定8080端口}).bind(8080);}}Copy

2、客户端代码

public class HelloClient {public static void main(String[] args) throws InterruptedException {new Bootstrap().group(new NioEventLoopGroup())// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现.channel(NioSocketChannel.class)// ChannelInitializer 处理器(仅执行一次)// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出channel.pipeline().addLast(new StringEncoder());}})// 指定要连接的服务器和端口.connect(new InetSocketAddress("localhost", 8080))// Netty 中很多方法都是异步的,如 connect// 这时需要使用 sync 方法等待 connect 建立连接完毕.sync()// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作.channel()// 写入消息并清空缓冲区.writeAndFlush("hello world");}}Copy

3、运行流程

左:客户端 右:服务器端

组件解释

channel 可以理解为数据的通道msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBufhandler 可以理解为数据的处理工序 工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法) pipeline 中有多个 handler,处理时会依次调用其中的 handler handler 分 Inbound 和 Outbound 两类 Inbound 入站Outbound 出站 eventLoop 可以理解为处理数据的工人eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责eventLoop 既可以执行io 操作也可以进行任务处理,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务eventLoop 按照pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop

使用组件

EventLoop

package ty;import ty.channel.nio.NioEventLoopGroup;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;/*** @Author:nioliu* @DATE: /9/9 14:27*/@Slf4jpublic class TestEventLoop {public static void main(String[] args) {// 1. 创建事件循环组// 可以负责 io 事件, 普通任务, 定时任务(内部使用ScheduledThreadPool实现)// 默认使用MAX(1,电脑核心数*2)线程数NioEventLoopGroup group = new NioEventLoopGroup(4);// 普通任务, 定时任务// DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();// 2. 获取下一个事件循环对象(相当于一个循环链表做轮询), 上面设置为4, 那么每4个为依次循环System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());// 3. 执行普通任务group.next().submit(new Runnable() {@Overridepublic void run() {log.debug("这是个普通任务");}});// 4. 执行定时循环任务group.next().scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("OK");}},3,10, TimeUnit.SECONDS);log.debug("main");// 优雅的关闭(任务全部执行完后关闭)group.shutdownGracefully();}}

创建一个EventLoopServer

package ponents;import ty.bootstrap.ServerBootstrap;import ty.buffer.ByteBuf;import ty.channel.ChannelHandlerContext;import ty.channel.ChannelInboundHandlerAdapter;import ty.channel.ChannelInitializer;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioServerSocketChannel;import ty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;/*** @Author:nioliu* @DATE: /9/9 14:39*/@Slf4jpublic class EventLoopServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf) msg;String s = buffer.toString(Charset.defaultCharset());log.debug(s);}});}}).bind(8080);}}

创建一个Client

package ponents;import ty.bootstrap.Bootstrap;import ty.channel.Channel;import ty.channel.ChannelInitializer;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioSocketChannel;import ty.handler.codec.string.StringEncoder;/*** @Author:nioliu* @DATE: /9/9 14:44*/public class EventLoopClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());// 内部使用CharBuffer.wrap(msg)}}).connect("localhost", 8080).sync().channel();//.writeAndFlush("我是nio");// 使用debug模式向服务器发送数据System.out.println(channel);}}

在System.out.println(channel); 处打上断电, 并设置只切断当前Thread

使用Evaluate Expression工具进行信息发送(调用channel…writeAndFlush(“我是nio”)😉

细分 EventLoopGroup

有些时候我们可能需要处理很大的任务, 这样即使有两个EventLoopGroup也难以解决这种问题, 那么我们就可以独立出一个Group, 专门用来处理这种事件

package ponents;import ty.bootstrap.ServerBootstrap;import ty.buffer.ByteBuf;import ty.channel.ChannelHandlerContext;import ty.channel.ChannelInboundHandlerAdapter;import ty.channel.ChannelInitializer;import ty.channel.EventLoopGroup;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioServerSocketChannel;import ty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import java.nio.charset.Charset;/*** @Author:nioliu* @DATE: /9/9 14:39*/@Slf4jpublic class EventLoopServer {public static void main(String[] args) {// 细分: 独立出来一个EventLoopGroup来处理耗时较长的任务EventLoopGroup eventExecutors = new NioEventLoopGroup();new ServerBootstrap().group(new NioEventLoopGroup(),new NioEventLoopGroup(2))// (parentGroup, childGroup)--->(负责Accept事件, 负责I/O事件).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf) msg;String s = buffer.toString(Charset.defaultCharset());log.debug(s);ctx.fireChannelRead(msg);// 让消息传递给下一个handler}}).addLast(eventExecutors, "handler2", new ChannelInboundHandlerAdapter() {// 传入指定的eventGroup@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = (ByteBuf) msg;String s = buffer.toString(Charset.defaultCharset());log.debug(s);}});}}).bind(8080);}}

通过启动客户端发送消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QL8BXWzV-1631245976996)(C:\Users\nioliu\AppData\Roaming\Typora\typora-user-images\image-0909151849785.png)]

有以下输出:

[DEBUG] -09-09 15:18:27,062 thread:nioEventLoopGroup-4-2 method:ponents.EventLoopServer$1$2.channelRead(EventLoopServer.java:37)查看线程号[DEBUG] -09-09 15:18:27,063 thread:nioEventLoopGroup-2-2 method:ponents.EventLoopServer$1$1.channelRead(EventLoopServer.java:45)查看线程号

注意到两次输出的thread名不一样(nioEventLoopGroup-4-2, nioEventLoopGroup-2-2 ), 说明提交给不同的Group执行, 其中nioEventLoopGroup-4-2的4指的是第4个Group, 2为当前Group的线程号

Channel

Channel 的常用方法

close() 可以用来关闭ChannelcloseFuture() 用来处理 Channel 的关闭 sync 方法作用是同步等待 Channel 关闭而 addListener 方法是异步等待 Channel 关闭 pipeline() 方法用于添加处理器write() 方法将数据写入 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去 writeAndFlush() 方法将数据写入并立即发送(刷出)

带有Future, Promise的类型都是和异步方法配套使用, 用来处理结果

package ponents;import ty.bootstrap.Bootstrap;import ty.channel.Channel;import ty.channel.ChannelFuture;import ty.channel.ChannelFutureListener;import ty.channel.ChannelInitializer;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioSocketChannel;import ty.handler.codec.string.StringEncoder;/*** @Author:nioliu* @DATE: /9/9 14:44*/public class EventLoopClient {public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());// 内部使用CharBuffer.wrap(msg)}})// 连接到服务器// connect是一个异步非阻塞方法(即发起调用的main线程不阻塞, 把任务交给connect线程).connect("localhost", 8080);// 2.1 使用sync 同步处理结果channelFuture// 下面都是ChannelFuture的方法.sync()// 这是一个阻塞方法, 同来同步结果, 一旦建立连接后才往下执行.channel()// 获取当前Channel.writeAndFlush("我是nio");// 使用这个channel发送消息// 2.2 使用addListener(回调对象)方法异步处理结果channelFuture.addListener(new ChannelFutureListener() {/*** 建立好后, 会调用这个方法* @param channelFuture 建立好的channel* @throws Exception*/@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {Channel channel = channelFuture.channel();channel.writeAndFlush("建立好channel了");}});}}

获取关闭后的Channel, 用来执行善后工作

package ponents;import ty.bootstrap.Bootstrap;import ty.channel.Channel;import ty.channel.ChannelFuture;import ty.channel.ChannelInitializer;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioSocketChannel;import ty.handler.codec.string.StringEncoder;/*** @Author:nioliu* @DATE: /9/9 14:44*/public class EventLoopClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());// 内部使用CharBuffer.wrap(msg)}})// 连接到服务器// connect是一个异步非阻塞方法(即发起调用的main线程不阻塞, 把任务交给connect线程).connect("localhost", 8080);// 获取关闭后的channelChannel channel = channelFuture.sync().channel();ChannelFuture closeFuture = channel.closeFuture();closeFuture.sync();// 阻塞// 执行关闭后的操作group.shutdownGracefully();// 先拒绝接受新的任务, 把现有的任务能运行玩的运行完, 然后再停止}}

Future & Promise

概念

netty 中的 Future 与 jdk 中的 Future同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

测试Jdk的Future

package ponents;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** @Author:nioliu* @DATE: /9/10 9:15*/@Slf4jpublic class TestJdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(2);// 2. 提交任务, callable有返回结果, runnable没有返回结果Future<Integer> future = executorService.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Thread.sleep(5000);return 50;}});// 阻塞方法 get(), 等到有结果System.out.println(future.get());System.out.println("取消了吗?:" + future.isCancelled());future.cancel(true);// true代表强制cancelSystem.out.println("取消了吗?:" + future.isCancelled());System.out.println("完成了嘛?:" + future.isDone());System.out.println("线程池结束了吗?:"+executorService.isShutdown());executorService.shutdown();System.out.println("取消了吗?:" + future.isCancelled());System.out.println("线程池结束了吗?:"+executorService.isShutdown());}}

测试Netty的FUture

package ponents;import ty.channel.EventLoop;import ty.channel.nio.NioEventLoopGroup;import ty.util.concurrent.Future;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;/*** Netty的Future提供了更强大的功能, 可以实时取到运行状态* @Author:nioliu* @DATE: /9/10 9:22*/@Slf4jpublic class TestNettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();// 会有多个loop(executor)// 拿到一个EventLoopEventLoop loop = group.next();Future<Integer> future = loop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("正在运行...");Thread.sleep(10000);return 70;}});log.debug("等待结果");log.debug("结果是" + future.getNow());group.shutdownGracefully();}}

测试Netty的Promise

package ponents;import ty.channel.nio.NioEventLoopGroup;import ty.util.concurrent.DefaultPromise;/*** Promise可以设置Future的结果** @Author:nioliu* @DATE: /9/10 9:32*/public class TestNettyPromise {public static void main(String[] args) throws InterruptedException {// Promise 相当于一个结果容器DefaultPromise<Object> promise = new DefaultPromise<>(new NioEventLoopGroup().next());// 主动创建Promise对象new Thread(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 向Promise容器填充对象promise.setSuccess(80);}).start();System.out.println(promise.getNow());Thread.sleep(1000);System.out.println(promise.getNow());}}

Handler & Pipeline

编写服务器

package ponents;import ty.bootstrap.ServerBootstrap;import ty.channel.*;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioServerSocketChannel;import ty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;/*** @Author:nioliu* @DATE: /9/10 9:43*/@Slf4jpublic class TestPipeline {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {// 1. 通过channel拿到pipelineChannelPipeline pipeline = nioSocketChannel.pipeline();// 2. 添加Handler: head -> h1 -> h2 -> h3 -> tailpipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h1");// 内部执行ctx.fireChannelRead(msg); 传递且只能传递给下一个handler, 如果不调用, 那么调用链就会断开super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h2");super.channelRead(ctx, msg);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h3");// super.channelRead(ctx, msg);// 后面没有了, 所以这个方法没用// 写一些数据, 触发出站处理器// 使用pipeline则从最后开始找出站处理器h6->h5->h4pipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("Server".getBytes()));// 使用cxt从当前处理器往前找出站处理器, 即从h3往前找, 那么就找不到h4,h5,h6ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("Server".getBytes()));}});// 增加出站处理器(建议使用addFirst, 不易弄混) ---> 只有写数据的时候才会触发write方法// 出站顺序: 6->5->4pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h4");super.write(ctx, msg, promise);}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h6");super.write(ctx, msg, promise);}});}}).bind(8080);}}

使用EmbeddedChannel进行测试

不再需要启动Client和Server

package ponents;import ty.channel.ChannelHandlerContext;import ty.channel.ChannelInboundHandlerAdapter;import ty.channel.ChannelOutboundHandlerAdapter;import ty.channel.ChannelPromise;import ty.channel.embedded.EmbeddedChannel;import lombok.extern.slf4j.Slf4j;/*** @Author:nioliu* @DATE: /9/10 10:08*/@Slf4jpublic class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h1");super.channelRead(ctx,msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h2");super.channelRead(ctx,msg);}};ChannelInboundHandlerAdapter h3 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h3");super.channelRead(ctx,msg);}};ChannelInboundHandlerAdapter h4 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("h4");super.channelRead(ctx,msg);}};ChannelOutboundHandlerAdapter h5 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h5");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h6 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("h6");super.write(ctx, msg, promise);}};EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4, h5, h6);embeddedChannel.writeInbound("hello1");// 测试入站embeddedChannel.writeOutbound("hello2");// 测试出站}}

ByteBuf

demo

package ty.Bytebuf;import ty.buffer.ByteBuf;import ty.buffer.ByteBufAllocator;import static ty.util.internal.StringUtil.NEWLINE;/*** @Author:nioliu* @DATE: /9/10 10:17*/public class TestByteBuf {public static void main(String[] args) {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();// ByteBuf可以动态扩容(初始为256)System.out.println(buffer);StringBuilder sb = new StringBuilder();for (int i = 0; i < 300; i++) {sb.append("a");}buffer.writeBytes(sb.toString().getBytes());// 写入字节数组System.out.println(buffer);// 扩容至512}

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

直接内存与堆内存

通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);Copy

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);

直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化与非池化

池化的最大意义在于可以重用ByteBuf,优点有

没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-ty.allocator.type={unpooled|pooled}

4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现4.1 之前,池化功能还不成熟,默认是非池化实现

组成

ByteBuf主要有以下几个组成部分

最大容量与当前容量

在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常

读写操作不同于ByteBuffer只用position进行控制,

ByteBuf分别由读指针和写指针两个指针控制

。进行读写操作时,无需进行模式的切换

读指针前的部分被称为废弃部分,是已经读过的内容读指针与写指针之间的空间称为可读部分写指针与当前容量之间的空间称为可写部分

写入

常用方法如下

注意

这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

使用方法

public class ByteBufStudy {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);ByteBufUtil.log(buffer);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});ByteBufUtil.log(buffer);buffer.writeInt(5);ByteBufUtil.log(buffer);buffer.writeIntLE(6);ByteBufUtil.log(buffer);buffer.writeLong(7);ByteBufUtil.log(buffer);}}

运行结果

read index:0 write index:0 capacity:16read index:0 write index:4 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 02 03 04 |.... |+--------+-------------------------------------------------+----------------+read index:0 write index:8 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 02 03 04 00 00 00 05|........ |+--------+-------------------------------------------------+----------------+read index:0 write index:12 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |+--------+-------------------------------------------------+----------------+read index:0 write index:20 capacity:20+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................||00000010| 00 00 00 07 |.... |+--------+-------------------------------------------------+----------------+

还有一类方法是set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

buffer.writeLong(7);ByteBufUtil.log(buffer);Copy// 扩容前read index:0 write index:12 capacity:16...// 扩容后read index:0 write index:20 capacity:20+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................||00000010| 00 00 00 07 |.... |+--------+-------------------------------------------------+----------------+

扩容规则

如果写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容

例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节

如果写入后数据大小超过512字节,则选择下一个2n如果写入后数据大小超过 512 字节,则选择下一个 2^n 如果写入后数据大小超过512字节,则选择下一个2n

例如写入后大小为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 已经不够了)

扩容不能超过maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常

Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)...

读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

public class ByteBufStudy {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});buffer.writeInt(5);// 读取4个字节System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());ByteBufUtil.log(buffer);// 通过mark与reset实现重复读取buffer.markReaderIndex();System.out.println(buffer.readInt());ByteBufUtil.log(buffer);// 恢复到mark标记处buffer.resetReaderIndex();ByteBufUtil.log(buffer);}}1234read index:4 write index:8 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 00 00 00 05 |.... |+--------+-------------------------------------------------+----------------+5read index:8 write index:8 capacity:16read index:4 write index:8 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 00 00 00 05 |.... |+--------+-------------------------------------------------+----------------+

还有以 get 开头的一系列方法,这些方法不会改变读指针的位置

retain & release

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了ReferenceCounted接口

每个 ByteBuf 对象的初始计数为 1调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

起点,对于 NIO 实现来讲,在 ty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

入站 ByteBuf 处理原则

对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)

出站 ByteBuf 处理原则

出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release

异常处理原则

有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

while (!buffer.release()) {}Copy

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

package ty.Bytebuf;import ty.buffer.ByteBuf;import ty.buffer.ByteBufAllocator;/*** @Author:nioliu* @DATE: /9/10 11:14*/public class TestSlice {public static void main(String[] args) {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'1', '2', '3', '4', '5', 'a', 'b', 'c', 'd', 'e'});// 在切片过程中没有发生数据复制, 使用的是同一块内存// 1,2,3,4,5ByteBuf f1 = buf.slice(0, 5);f1.retain();// 引用计数+1, 那么release()的时候并没有直接释放, 除非f1也释放了// a,b,c,d,eByteBuf f2 = buf.slice(5, 5);// 切片后的ByteBuf不能写入, 但是可以替换f1.setByte(0, 'a');// 释放原来的buf, 那么其切片都会被释放buf.release();f1.release();}}

Composite

将许多小的ByteBuf组合成一个ByteBuf

package ty.Bytebuf;import ty.buffer.ByteBuf;import ty.buffer.ByteBufAllocator;import io.positeByteBuf;/*** @Author:nioliu* @DATE: /9/10 11:29*/public class TestCompositeByteBuf {public static void main(String[] args) {ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{'a', 'b', 'c', 'd'});ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{'1', '2', '3', '4', '5',});CompositeByteBuf compositeByteBuf = positeBuffer();compositeByteBuf.addComponents( buffer1, buffer2);System.out.println(compositeByteBuf);}}

优势

池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能读写指针分离,不需要像 ByteBuffer 一样切换读写模式可以自动扩容支持链式调用,使用更流畅很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。