Netty-核心组件

本文最后更新于:2 年前

观念理解

  • channel:数据的传输通道。
  • msg:流动的数据。
  • pipeline:流水线,上面有多道工序。
  • handler:数据处理的工序。分为 inbound,outbound。
  • eventloop:处理数据的工人。
    • 工人可以管理多个 channel 的 io 操作(一个工人内部有 selector 和 thread)。一旦工人负责了某个 channel,就要负责到底。(线程安全)。
    • 工人既可以执行 io 操作,还可以进行任务处理,工人都有自己的任务队列。
    • 工人按照 pipeline 的顺序,依次按照对应的 handler 进行。

组件

Eventloop(事件循环对象)

  • 本质上是一个单线程执行器(同时维护了一个 selector)。
  • 用来处理 channel 上源源不断的 io 事件。(监听网络事件并调用事件处理器进行相关的 IO 操作)
  • eventloop 负责处理注册到其上的 channel 的 io 操作。
  • 继承了 juc 下的 scheduledExecutorService(包含了线程池的方法)
  • 继承了 netty 自己的 oedereventexecutor(判断一个线程是否属于此 eventloop,判断一个 eventloop 属于哪个 eventloopgroup)

EventloopGroup(事件循环组)

  • 一组 eventloop。
  • channel 会调用他的 register 方法绑定一个 eventloop,后续 io 操作由他负责到底。
  • EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从⽽保证线程安全
  • NioEventLoopGroup 默认的构造函数实际会起的线程数为 CPU 核心*2 。
    每个 NioEventLoopGroup 对象内部都会分配⼀组 NioEventLoop ,其⼤⼩是 nThreads , 这样就构成了⼀个线程池, ⼀个 NIOEventLoop 和⼀个线程相对应。

channel

传送数据的传输通道,或者说对网络操作的抽象。
包括 bind(),connect(),read(),write()等方法。
常用 nioserversocketchannel 与 niosocketchannel。
close();
closefuture();
pipeline();
write();
writeandflush();数据写入并刷出。

Netty 是异步⾮阻塞的,我们不能⽴刻得到操作是否执⾏成功,但是,你可以通过 ChannelFuture 接⼝addListener() ⽅法注册⼀个 ChannelFutureListener ,当操作执⾏成功或者失败时,监听就会⾃动触发返回结果。并且,你还可以通过 ChannelFuture 的 channel() ⽅法获取关联的 Channel。

  • channlefuture 异步非阻塞的解决:
    1,sync();
    2,addlistener(回调对象)异步处理结果。(等待结果的也不是主线程)

为什么要异步?

必须多线程,多核 cpu。
并没有缩短响应时间,而是吞吐量。
必须合理的任务拆分

future&promise

future 继承 jdk 的 future。
promise 继承自 future。
jdk:只能同步等待任务结束。

netty future:可以异步等待方法结束。也要等待任务结束。

promise:只作为两个线程传递结果的容器。

handler&pipeline

  • handler:用来处理 channel 上各种事件,分为入站和出站。
  • 注意:只有向 channel 中写数据才会触发出站 handler。
  • 出站顺序是从后往前。
  • ctx.writeansflush(),会从那个入站向前找。后面的出站处理器就不会执行了。
  • 1-2-3-6-5-4。

Bootstrap 和 ServerBootstrap

image-20210804201619460

image-20210804201633173

Bootstrap 通常使⽤ connet() ⽅法连接到远程的主机和端⼝,作为⼀个 Netty TCP 协议通
信中的客户端。另外, Bootstrap 也可以通过 bind() ⽅法绑定本地的⼀个端⼝,作为 UDP
协议通信中的⼀端。

  1. ServerBootstrap 通常使⽤ bind() ⽅法绑定本地的端⼝上,然后等待客户端的连接。
  2. Bootstrap 只需要配置⼀个线程组— EventLoopGroup 。⽽ ServerBootstrap 需要配置两个线
    程组— EventLoopGroup ,⼀个⽤于处理连接,⼀个⽤于具体的处理。

bytebuf

对 nio 中的增强。

支持:
1,直接内存。创建销毁比较昂贵。读写性能高。(默认)
堆内存。受到 GC 影响。
2,池化的最大意义在于可以重用 ByteBuf。

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能
    4.1 之后非安卓默认池化。
    参数设定:netty.allocator.type={polled|unpolled}

组成:

image-20210804201545205

可以指定最大容量。

内存回收
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
    扩容
  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
  • 扩容不能超过 max capacity 会报错

ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

netty 服务端与客户端启动过程

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080); // 4

1,创建两个 NioEventLoopGroup 对象实例: bossGroup 和 workerGroup 。
bossGroup : ⽤于处理客户端的 TCP 连接请求。
workerGroup : 负责每⼀条连接的具体读写数据的处理逻辑,真正负责 I/O 读写操作,交
由对应的 Handler 处理。
2,创建了⼀个服务端启动引导/辅助类: ServerBootstrap ,这个类将引导我们进⾏服
务端的启动⼯作。
3,通过 .group() ⽅法给引导类 ServerBootstrap 配置两⼤线程组,确定了线程模型。
4,通过 channel() ⽅法给引导类 ServerBootstrap 指定了 IO 模型为 NIO
5,通过 .childHandler() 给引导类创建⼀个 ChannelInitializer ,然后指定了服务端消息的业务
处理逻辑 HelloServerHandler 对象
6,调⽤ ServerBootstrap 类的 bind() ⽅法绑定端⼝

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<Channel>() { // 3
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7

1.创建⼀个 NioEventLoopGroup 对象实例
2.创建客户端启动的引导类是 Bootstrap
3.通过 .group() ⽅法给引导类 Bootstrap 配置⼀个线程组
4.通过 channel() ⽅法给引导类 Bootstrap 指定了 IO 模型为 NIO
5.通过 .childHandler() 给引导类创建⼀个 ChannelInitializer ,然后指定了客户端消息的业务处理
逻辑 HelloClientHandler 对象
6.调⽤ Bootstrap 类的 connect() ⽅法进⾏连接,这个⽅法需要指定两个参数:
inetHost : ip 地址
inetPort : 端⼝号