核心组件

网络通信层

ByteBuf

ByteBuf 是网络通信传输数据时的字节数组,可以看作是 Netty 对 Java NIOByteBuffer 字节容器的封装和抽象,更便于使用和更好的性能

  1. ByteBuf 的创建方式

    //直接内存
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    //堆内存
    ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer();
    //直接内存
    ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer();
    
  2. 直接内存的特点

    • 直接内存创建和销毁的开销大,读写性能高(mmap,少一次内存拷贝),适合配合池化功能
    • 直接内存不受JVM内存回收管理,GC压力小,但建议及时主动释放
  3. 池化技术

    • 重用ByteBuf,可以减少ByteBuf的创建,对于直接内存减小开销,对于堆内存减小GC压力
    • 采用了jemalloc 类似的内存分配算法提升分配效率(记不住
    • 非安卓平台默认情况下都开启
  4. ByteBuf 的机制

    • ByteBuf 可扩容,可指定最大容量,默认情况为Integer.MAX_VLAUE
      image-20220817152302076

    • 读写指针,比 nio 的指针好用,不用切换读写模式

      • 可以获取读写指针,到时候再从这个位置读/写

      image-20220817152700994

    • 使用API,简单易用

      image-20220817153358500

  5. 内存回收

    • 提供统一个接口ReferenceCounted,每种 ByteBuf 都实现了该接口

      public interface ReferenceCounted {
          int refCnt();
          ReferenceCounted retain();
          ReferenceCounted retain(int var1);
          ReferenceCounted touch();
          ReferenceCounted touch(Object var1);
          boolean release();
          boolean release(int var1);
      }
      
      public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
          xxx
      }
      
      //调用release进行内存释放
      buf.release();
      //或者用工具类
      ReferenceCountUtil.release(buf);
      
    • ByteBuf不存在循环引用问题,采用引用计数法判断一个ByteBuf是否可被内存释放

    • ByteBuf 通常会在处理链上传递,因此我们应该在最后一次处理完成后才能释放该 ByteBuf
      同时netty有头尾处理器,如果ByteBuf在尾处理器(有出入站之分)还没被释放,则会被其释放
      image-20220817160538343

  6. 零拷贝

    此处的零拷贝指的不是操作系统层面的零拷贝,而是减少数据复制

    • slice:切片,截取ByteBuf的一段,使用的是用一块内存空间,会互相影响;不可写入更多数据
    • duplicate:截取全部,使用的是同一块内存空间,但读写指针互相独立,可写入更多数据,会互相影响
    • composite:逻辑组装多个ByteBuf,避免内存复制,但难于维护读写指针

BootStrap、ServerBoostrap

客户端、服务端引导启动类

  1. bootstrap使用方式及常用设置参数

    bootstrap = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup();
    bootstrap.group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        //连接超时时间
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .handler(xxx);
    bootstrap.connect(inetSocketAddress).addListener(xxx);
    
  2. ServerBootStrap使用方式及常用设置参数

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        //关闭Nagle算法,降低延迟
        .childOption(ChannelOption.TCP_NODELAY, true)
        //开启TCP底层心跳,保持连接
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
        //ACCEPT队列
        .option(ChannelOption.SO_BACKLOG, 128)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(初始化);
    b.bind(host, RpcConfig.getInstance().getNettyPort()).sync();
    
  3. 待补充一些高深点的…

Channel

Channel 是Netty对网络IO操作封装的抽象类,每个channel与一个eventloop绑定

  1. 基本 Api

    Channel channel = channelFuture.channel();
    //关闭channel
    channel.close();
    //关闭channel,阻塞,或者关闭后执行回调
    channel.closeFuture().sync();
    channel.closeFuture().addListener(future -> {
        // 回调方法
    });
    //获取到处理器链,可添加对Channel里消息的处理器
    ChannelPipeline pipeline = channel.pipeline();
    //将消息写入Channel并刷出
    channel.writeAndFlush("test");
    
  2. ChannelFuture

    对于channel连接和关闭都需要调用对应的sync,或者使用对应的回调方法

事件调度器

EventLoop

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}
  • eventLoop是单线程执行器(维护了一个selector),可以处理channel上的IO任务
  • eventLoop都继承自线程池,可执行定时任务
  • 还继承自OrderedEventExecutor,可处理有序的IO任务

EventLoopGroup

  • EventLoopGroup是一组EventLoop,客户端连接时,新建的Channel会调用EventLoopGroup的register()方法,与其中的一个EventLoop绑定,后续这个Channel的IO事件都由这个EventLoop处理,保证了Channel io事件处理时的线程安全(不会有多个EventLoop处理同个Channel的事件)
  • NioEventLoop默认线程数时cpu核心数*2
  • 细化分工
    • 一个BossEventLoopGroup负责accept事件
    • workerEventLoopGroup负责socketChannel 上的读写事件
    • 再创建一个serviceHandlerGroup,用于处理业务逻辑,实现I/O线程和业务逻辑处理线程的隔离,同时还能并发执行Handler,提升性能

服务编排层

ChannelPipeline

  • 可以向其中添加Handler处理器,channel上的消息依次被处理器处理,像流水线一样

ChannelHandler

  • 分为入栈和出栈两种,顺序也是
  • 入栈Handler一般是ChannelInboundHandlerAdapter
  • 出栈Handler一般是ChannelOutboundHandlerAdapter
  • 消息在pipeLine上被处理时,分为入栈和出栈,入栈时会跳过出栈处理器,反之同理
  • 对于有状态的Handler,多个EventLoop不能使用同一个Handler,即是否线程安全
    Netty对线程安全的Handler用@Sharable注解
  • 对于我们实现的Handler,判断是否线程安全时需要考虑ByteBuf传递的情况

ChannelHandlerContext

  • 保存了channel的上下文
  • ctx.writeAndFlush()channel.writeAndFlush()的区别
    • 前者从当前handler往前找出栈处理器
    • 后者走到tail才往回依次找出栈处理器

IO模型

当我们发起IO操作时,操作系统需要经历两个阶段:

  • 等待数据,如:等待客户端发起socket连接
  • 拷贝数据,将数据从内核拷贝到用户空间

同步阻塞IO

准备数据、拷贝数据阶段都需要阻塞等待

image-20220816145040149

同步非阻塞IO

准备数据阶段能返回当前数据是否准备完毕,拷贝数据阶段需要阻塞等待

image-20220816150245453

IO多路复用(重点)

准备数据、拷贝数据阶段都需要阻塞等待,但一个线程可以获取到多个IO事件,集中处理

  • 当事件被selector捕获时,说明数据已经准备好,无需再等待这一部分时间;也就是说只有在一个IO请求都没有的时候才会阻塞
  • 基本原理:利用单个线程同时监听多个FD(文件描述符),在某个FD可读可写时得到通知,避免无效的等待
  • image-20220816150730155

IO多路复用的具体实现

select

  • 创建要监听的fd集合,用一个bit数组代表要监听的fd集合,最大1024个
  • 调用select,拷贝到内核空间
  • 内核依次扫描监听
  • 将可读写的fd改成1,未就绪的置为0,返回就绪fd数量,并覆盖用户空间fd集合
  • 再遍历就绪的fdset

image-20220816151438590

缺点:

  • fd_set监听的fd数量不能超过1024
  • select无法得知哪个fd就绪,需要遍历整个fd_set
  • 需要将整个fd_set从用户空间到内核空间来回拷贝各一次

poll

image-20220816160648994

和select相比只改进了能监听的fd的数量,即使用链表存储,理论上无上限
但监听的fd越多,性能下降

epoll

  • 使用一颗红黑树记录要监听的fd
  • 使用一个链表,记录就绪的fd
  • epoll_create(int size):创建一个epoll结构体,返回对应的句柄epfd
  • epoll_ctl():将一个fd添加到epoll的红黑树中,并设置回调函数,fd就绪时触发,添加到列表中
  • epoll_wait():检查链表是否为空,不为空则返会就绪的FD数量
    • image-20220816165301885
    • 这里空的event数组是重点,会存放就绪的fd!
  • 优化点:
    • 拷贝的次数(只需要再create时把fd拷贝过去)和数量(只拷贝就绪的)都大大减少
    • poll的链表性能差,epoll红黑树性能好,监听fd的数量增加时对性能影响小
    • 能直接知道哪些fd就绪,不需要再遍历一遍所有的fd
  • epoll_wait的事件通知模式
    • LevelTriggered:水平触发,会重复通知多次,直到数据被处理,默认
    • EdgeTriggered:边缘触发,只会通知一次,不管数据是否被处理完
    • eg:
      socket发送了2kb的数据,服务端调用epoll_wait(),只处理的1kb的数据,循环再次调用epoll_wait()
      • LT:返回就绪fd数量,直到数据被处理完
      • ET:返回0,不通知了
    • LT的缺点:
      • 惊群现象:每次梳理数据都会唤醒所有的进程来处理
      • 多次重复通知有性能上的问题
    • ET如何处理:
      • 手动把FD放回链表
      • 一次处理完所有数据(非阻塞循环读取)

Reactor线程模式

单线程单Reactor

  • 单个线程负责所有的IO操作,所以的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成

  • image-20220818111930001

  • 在netty中的代码体现

    //netty使用的reactor模型是主从Reactor
    //这里开启的 EventLoopGroup 传入参数1,理解为只有一个线程来负责所有IO
    //不知道是否理解错了,欢迎评论探讨!
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    ServerBootstrap b = new ServerBootstrap();
    b.group(group, group);
    

多线程单Reactor

  • 将业务操作分给worker线程池处理

  • image-20220818142534440

  • netty里的代码体现

    //多线程单Reactor
    //netty使用的reactor模型是主从Reactor
    //这里开启的 EventLoopGroup 传入参数1,理解为只有一个线程来负责所有IO
    //不知道是否理解错了,欢迎评论探讨!
    //多线程主要是在业务handler用上线程池
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    DefaultEventExecutorGroup serviceWorker = new DefaultEventExecutorGroup(16);
    ServerBootstrap b2 = new ServerBootstrap();
    b2.group(group,group) //这里依旧是单reactor
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                //这里体现了多线程
                channel.pipeline().addLast(serviceWorker,handler);
            }
        });
    

多线程多Reactor

  • 进一步把IO操作细分,在高并发的环境下,优先级最高的自然是accept

  • 主reactor

    • 负责处理accept
    • 将对应socket分发给从reactor
    • 创建一个Channel,并选择从reactor里的一个EventLoop进行绑定
  • 在业务处理的handler中,使用线程池处理

  • image-20220818150643864

  • 在netty中主流的便是这种用法,举一个RPC项目中的例子

    //这是主reactor,传入的参数与监听的端口数目一致(ServerSocketChannel)
    EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
    //这是从reactor池
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    //这是负责处理业务的线程池
    DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
        RuntimeUtil.cpus() * 2,
        ThreadPoolFactoryUtil.createThreadFactory("service-handler-group", false));
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            //关闭Nagle算法,降低延迟
            .childOption(ChannelOption.TCP_NODELAY, true)
            //开启TCP底层心跳,保持连接
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
            //ACCEPT队列
            .option(ChannelOption.SO_BACKLOG, 128)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS))
                        .addLast(new RpcMessageEncoder())
                        .addLast(new RpcMessageDecoder())
                        // 实现I/O线程和业务逻辑处理线程的隔离,同时还能并发执行Handler,提升性能
                        .addLast(serviceHandlerGroup, new NettyRpcServerHandler());
                }
            });
        String host = InetAddress.getLocalHost().getHostAddress();
        //绑定端口,建立连接
        ChannelFuture channelFuture = b.bind(host, RpcConfig.getInstance().getNettyPort()).sync();
        //阻塞,等待服务器关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("occur error when start server: ", e.getMessage());
    } finally {
        log.info("shut down all EventLoopGroup");
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        serviceHandlerGroup.shutdownGracefully();
    }
    

Netty中的设计模式

责任链模式

将一个请求沿着一条链传递,使得多个对象都有机会处理这个请求

  • 责任处理器接口

    • ChannelHandler
    • ChannelInboundHandlerChannelOutBoundHandler
  • 创建链,增加删除责任处理器接口

    • pipline
  • 上下文

    • ChannelHandlerContext:获取到channelexecutor
  • 责任终止机制:

    • ctx.fireChannelRead()是否调用

装饰者模式

  1. 装饰者模式概述

  2. 好处:可以通过不修改代码的方式,一层一层的“装饰”类,使得类拥有越来越多的属性、功能

  3. 在Netty中的应用——WrappedByteBuf

    • 传入一个ByteBuf作为被装饰者,这个装饰类作为基类,被后续装饰者嵌套装饰
    • SimpleLeakAwareByteBuf继承自WrappedByteBuf,添加了在内存释放时的追踪功能

观察者模式

  1. 概述

    • 分为两种角色:被观察者与观察者,分别实现接口ObserverableObserver
    • 观察者中提供被通知方法;被观察者类中提供 观察者注册、移除和通知三个方法;
  2. 在netty里的使用

    //channelFuture就是一个被观察者
    ChannelFuture channelFuture = channel.writeAndFlush("test");
    //向被观察者channelFuture添加一个观察者
    //当channelFuture写出消息时通知该观察者,也就是执行这个回调方法
    channelFuture.addListener(future -> {
        //todo 1
    });
    

参考资料

Q.E.D.


记录 • 分享 • 日常