核心组件
网络通信层
ByteBuf
ByteBuf 是网络通信传输数据时的字节数组,可以看作是 Netty 对 Java
NIO
的ByteBuffer
字节容器的封装和抽象,更便于使用和更好的性能
-
ByteBuf 的创建方式
//直接内存 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); //堆内存 ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(); //直接内存 ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer();
-
直接内存的特点
- 直接内存创建和销毁的开销大,读写性能高(mmap,少一次内存拷贝),适合配合池化功能
- 直接内存不受JVM内存回收管理,GC压力小,但建议及时主动释放
-
池化技术
- 重用ByteBuf,可以减少ByteBuf的创建,对于直接内存减小开销,对于堆内存减小GC压力
- 采用了
jemalloc
类似的内存分配算法提升分配效率(记不住) - 非安卓平台默认情况下都开启
-
ByteBuf 的机制
-
ByteBuf 可扩容,可指定最大容量,默认情况为
Integer.MAX_VLAUE
-
读写指针,比 nio 的指针好用,不用切换读写模式
- 可以获取读写指针,到时候再从这个位置读/写
-
使用API,简单易用
-
-
内存回收
-
提供统一个接口
ReferenceCounted
,每种 ByteBuf 都实现了该接口public interface ReferenceCounted { int refCnt(); ReferenceCounted retain(); ReferenceCounted retain(int var1); ReferenceCounted touch(); ReferenceCounted touch(Object var1); boolean release(); boolean release(int var1); } public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> { xxx } //调用release进行内存释放 buf.release(); //或者用工具类 ReferenceCountUtil.release(buf);
-
ByteBuf不存在循环引用问题,采用引用计数法判断一个ByteBuf是否可被内存释放
-
ByteBuf 通常会在处理链上传递,因此我们应该在最后一次处理完成后才能释放该 ByteBuf
同时netty有头尾处理器,如果ByteBuf在尾处理器(有出入站之分)还没被释放,则会被其释放
-
-
零拷贝
此处的零拷贝指的不是操作系统层面的零拷贝,而是减少数据复制
- slice:切片,截取ByteBuf的一段,使用的是用一块内存空间,会互相影响;不可写入更多数据
- duplicate:截取全部,使用的是同一块内存空间,但读写指针互相独立,可写入更多数据,会互相影响
- composite:逻辑组装多个ByteBuf,避免内存复制,但难于维护读写指针
BootStrap、ServerBoostrap
客户端、服务端引导启动类
-
bootstrap使用方式及常用设置参数
bootstrap = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) //连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(xxx); bootstrap.connect(inetSocketAddress).addListener(xxx);
-
ServerBootStrap使用方式及常用设置参数
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //关闭Nagle算法,降低延迟 .childOption(ChannelOption.TCP_NODELAY, true) //开启TCP底层心跳,保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数 //ACCEPT队列 .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(初始化); b.bind(host, RpcConfig.getInstance().getNettyPort()).sync();
-
待补充一些高深点的…
Channel
Channel 是Netty对网络IO操作封装的抽象类,每个channel与一个eventloop绑定
-
基本 Api
Channel channel = channelFuture.channel(); //关闭channel channel.close(); //关闭channel,阻塞,或者关闭后执行回调 channel.closeFuture().sync(); channel.closeFuture().addListener(future -> { // 回调方法 }); //获取到处理器链,可添加对Channel里消息的处理器 ChannelPipeline pipeline = channel.pipeline(); //将消息写入Channel并刷出 channel.writeAndFlush("test");
-
ChannelFuture
对于channel连接和关闭都需要调用对应的
sync
,或者使用对应的回调方法
事件调度器
EventLoop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
- eventLoop是单线程执行器(维护了一个
selector
),可以处理channel上的IO任务 - eventLoop都继承自线程池,可执行定时任务
- 还继承自OrderedEventExecutor,可处理有序的IO任务
EventLoopGroup
- EventLoopGroup是一组EventLoop,客户端连接时,新建的Channel会调用EventLoopGroup的
register()
方法,与其中的一个EventLoop绑定,后续这个Channel的IO事件都由这个EventLoop处理,保证了Channel io事件处理时的线程安全(不会有多个EventLoop处理同个Channel的事件) - NioEventLoop默认线程数时cpu核心数*2
- 细化分工
- 一个BossEventLoopGroup负责accept事件
- workerEventLoopGroup负责socketChannel 上的读写事件
- 再创建一个
serviceHandlerGroup
,用于处理业务逻辑,实现I/O线程和业务逻辑处理线程的隔离,同时还能并发执行Handler,提升性能
服务编排层
ChannelPipeline
- 可以向其中添加Handler处理器,channel上的消息依次被处理器处理,像流水线一样
ChannelHandler
- 分为入栈和出栈两种,顺序也是
- 入栈Handler一般是
ChannelInboundHandlerAdapter
- 出栈Handler一般是
ChannelOutboundHandlerAdapter
- 消息在pipeLine上被处理时,分为入栈和出栈,入栈时会跳过出栈处理器,反之同理
- 对于有状态的Handler,多个EventLoop不能使用同一个Handler,即是否线程安全
Netty对线程安全的Handler用@Sharable
注解 - 对于我们实现的Handler,判断是否线程安全时需要考虑ByteBuf传递的情况
ChannelHandlerContext
- 保存了channel的上下文
ctx.writeAndFlush()
和channel.writeAndFlush()
的区别- 前者从当前handler往前找出栈处理器
- 后者走到
tail
才往回依次找出栈处理器
IO模型
当我们发起IO操作时,操作系统需要经历两个阶段:
- 等待数据,如:等待客户端发起socket连接
- 拷贝数据,将数据从内核拷贝到用户空间
同步阻塞IO
准备数据、拷贝数据阶段都需要阻塞等待
同步非阻塞IO
准备数据阶段能返回当前数据是否准备完毕,拷贝数据阶段需要阻塞等待
IO多路复用(重点)
准备数据、拷贝数据阶段都需要阻塞等待,但一个线程可以获取到多个IO事件,集中处理
- 当事件被selector捕获时,说明数据已经准备好,无需再等待这一部分时间;也就是说只有在一个IO请求都没有的时候才会阻塞
- 基本原理:利用单个线程同时监听多个FD(文件描述符),在某个FD可读可写时得到通知,避免无效的等待
IO多路复用的具体实现
select
- 创建要监听的fd集合,用一个bit数组代表要监听的fd集合,最大1024个
- 调用select,拷贝到内核空间
- 内核依次扫描监听
- 将可读写的fd改成1,未就绪的置为0,返回就绪fd数量,并覆盖用户空间fd集合
- 再遍历就绪的fdset
缺点:
- fd_set监听的fd数量不能超过1024
- select无法得知哪个fd就绪,需要遍历整个fd_set
- 需要将整个fd_set从用户空间到内核空间来回拷贝各一次
poll
和select相比只改进了能监听的fd的数量,即使用链表存储,理论上无上限
但监听的fd越多,性能下降
epoll
- 使用一颗红黑树记录要监听的fd
- 使用一个链表,记录就绪的fd
epoll_create(int size)
:创建一个epoll结构体,返回对应的句柄epfdepoll_ctl()
:将一个fd添加到epoll的红黑树中,并设置回调函数,fd就绪时触发,添加到列表中epoll_wait()
:检查链表是否为空,不为空则返会就绪的FD数量- 这里空的event数组是重点,会存放就绪的fd!
- 优化点:
- 拷贝的次数(只需要再create时把fd拷贝过去)和数量(只拷贝就绪的)都大大减少
- poll的链表性能差,epoll红黑树性能好,监听fd的数量增加时对性能影响小
- 能直接知道哪些fd就绪,不需要再遍历一遍所有的fd
- epoll_wait的事件通知模式
- LevelTriggered:水平触发,会重复通知多次,直到数据被处理,默认
- EdgeTriggered:边缘触发,只会通知一次,不管数据是否被处理完
- eg:
socket发送了2kb的数据,服务端调用epoll_wait()
,只处理的1kb的数据,循环再次调用epoll_wait()
- LT:返回就绪fd数量,直到数据被处理完
- ET:返回0,不通知了
- LT的缺点:
- 惊群现象:每次梳理数据都会唤醒所有的进程来处理
- 多次重复通知有性能上的问题
- ET如何处理:
- 手动把FD放回链表
- 一次处理完所有数据(非阻塞循环读取)
Reactor线程模式
单线程单Reactor
-
单个线程负责所有的IO操作,所以的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成
-
-
在netty中的代码体现
//netty使用的reactor模型是主从Reactor //这里开启的 EventLoopGroup 传入参数1,理解为只有一个线程来负责所有IO //不知道是否理解错了,欢迎评论探讨! NioEventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(group, group);
多线程单Reactor
-
将业务操作分给worker线程池处理
-
-
netty里的代码体现
//多线程单Reactor //netty使用的reactor模型是主从Reactor //这里开启的 EventLoopGroup 传入参数1,理解为只有一个线程来负责所有IO //不知道是否理解错了,欢迎评论探讨! //多线程主要是在业务handler用上线程池 NioEventLoopGroup group = new NioEventLoopGroup(1); DefaultEventExecutorGroup serviceWorker = new DefaultEventExecutorGroup(16); ServerBootstrap b2 = new ServerBootstrap(); b2.group(group,group) //这里依旧是单reactor .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //这里体现了多线程 channel.pipeline().addLast(serviceWorker,handler); } });
多线程多Reactor
-
进一步把IO操作细分,在高并发的环境下,优先级最高的自然是
accept
-
主reactor
- 负责处理accept
- 将对应socket分发给从reactor
- 创建一个Channel,并选择从reactor里的一个EventLoop进行绑定
-
在业务处理的handler中,使用线程池处理
-
-
在netty中主流的便是这种用法,举一个RPC项目中的例子
//这是主reactor,传入的参数与监听的端口数目一致(ServerSocketChannel) EventLoopGroup bossGroup = new NioEventLoopGroup(1); //这是从reactor池 EventLoopGroup workerGroup = new NioEventLoopGroup(); //这是负责处理业务的线程池 DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup( RuntimeUtil.cpus() * 2, ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false)); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //关闭Nagle算法,降低延迟 .childOption(ChannelOption.TCP_NODELAY, true) //开启TCP底层心跳,保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数 //ACCEPT队列 .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)) .addLast(new RpcMessageEncoder()) .addLast(new RpcMessageDecoder()) // 实现I/O线程和业务逻辑处理线程的隔离,同时还能并发执行Handler,提升性能 .addLast(serviceHandlerGroup, new NettyRpcServerHandler()); } }); String host = InetAddress.getLocalHost().getHostAddress(); //绑定端口,建立连接 ChannelFuture channelFuture = b.bind(host, RpcConfig.getInstance().getNettyPort()).sync(); //阻塞,等待服务器关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("occur error when start server: ", e.getMessage()); } finally { log.info("shut down all EventLoopGroup"); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); serviceHandlerGroup.shutdownGracefully(); }
Netty中的设计模式
责任链模式
将一个请求沿着一条链传递,使得多个对象都有机会处理这个请求
-
责任处理器接口
ChannelHandler
ChannelInboundHandler
、ChannelOutBoundHandler
-
创建链,增加删除责任处理器接口
pipline
-
上下文
ChannelHandlerContext
:获取到channel
和executor
-
责任终止机制:
ctx.fireChannelRead()
是否调用
装饰者模式
-
好处:可以通过不修改代码的方式,一层一层的“装饰”类,使得类拥有越来越多的属性、功能
-
在Netty中的应用——
WrappedByteBuf
- 传入一个ByteBuf作为被装饰者,这个装饰类作为基类,被后续装饰者嵌套装饰
SimpleLeakAwareByteBuf
继承自WrappedByteBuf
,添加了在内存释放时的追踪功能
观察者模式
-
概述
- 分为两种角色:被观察者与观察者,分别实现接口
Observerable
和Observer
- 观察者中提供被通知方法;被观察者类中提供 观察者注册、移除和通知三个方法;
- 分为两种角色:被观察者与观察者,分别实现接口
-
在netty里的使用
//channelFuture就是一个被观察者 ChannelFuture channelFuture = channel.writeAndFlush("test"); //向被观察者channelFuture添加一个观察者 //当channelFuture写出消息时通知该观察者,也就是执行这个回调方法 channelFuture.addListener(future -> { //todo 1 });
参考资料
- bin的技术小屋⭐
- 黑马程序员Netty课程⭐
- JavaGuide RPC
- B站Netty源码解读
Q.E.D.