Netty的数据传输载体——ByteBuf介绍

发布于2024-12-12
字数: 11354
AQS

在netty中,ByteBuf是数据传输的载体,数据的读写都是以ByteBuf为单位进行交互的。

Java原生NIO

Java的原生NIO也实现了一个ByteBuffer,Netty自己也实现了一个,因为原生的不好用。至于为什么不好用,看下面的结构就知道了。

ByteBuffer有三个重要的属性:

  • position:当前位置,具体的意义要看当前处于哪种模式
    • 写模式:当前写的位置,初始值为0,最大值为capacity - 1,每次往ByteBuffer写入一个数据,position就会随之后移一个位置
    • 读模式:当前读的位置,读一个数据,就往前移动一位
  • limit:ByteBuffer中有效数据长度大小,具体的意义要看当前处于哪种模式
    • 写模式:表示当前ByteBuffer还可以写入多少数据,此时它的值等于capacity
    • 读模式:表示当前ByteBuffer最多还有多少数据可读
  • capacity:表示当前ByteBuffer最大的的可写容量

我们用画一张流转示意图来帮助理解。

从图中可以看到,原生的ByteBuffer操作很繁琐,需要频繁的调用flip模式切换读写模式,并且不支持扩容,在实际开发的时候不好控制分配容量,分多了浪费,分少了报BufferOverflowException。因此Netty自己实现了一个ByteBuf,支持动态扩容、读写指针分离、引用计数等。

ByteBuf结构

从图中可以看出,ByteBuf由三个指针分成了四部分,三个指针分别是读指针、写指针、最大容量,四个部分分别是:

  • 已废弃字节:已经丢弃的字节,可以使用discardReadBytes()释放这部分空间
  • 可读字节:表示可以读取的数据,大小为writerIndex - readerIndex,每读取一个字节,readerIndex会自增一,直到readerIndex == writerIndex时,就表示不可读
  • 可写字节:表示可写入字节,每写入一个字节writerIndex就会自增一,如果writerIndex > capacity时就会触发扩容
  • 可扩容字节:表示ByteBuf最多可以扩容多少个字节,当writerIndex > capacity时就会调用calculateNewCapacity()扩容,如果超出maxCapacity时会抛出IndexOutOfBoundsException异常

接下来一样画张图看看指针变化情况:

ByteBuf核心API

容量API

  • capacity(): 表示ByteBuf底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节)
  • maxCapacity():表示ByteBuf底层最大能够占用多少字节的内存,当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity,超过这个数,就抛出异常。
  • readableBytes()与isReadable()readableBytes()表示ByteBuf当前可读的字节数,它的值等于writerIndex - readerIndex,如果两者相等,则不可读,isReadable()方法返回false
  • writableBytes()、isWritable()与maxWritableBytes()writableBytes()表示ByteBuf当前可写的字节数,它的值等于capacity - writerIndex,如果两者相等,则表示不可写,isWritable()返回false,但是这个时候,并不代表不能往ByteBuf写数据了。如果发现往ByteBuf写数据写不进去,Netty会自动扩容ByteBuf,直到底层的内存大小为maxCapacity,而maxWritableBytes()就表示可写的最大字节数,它的值maxCapacitywriterIndex

读写API

读写API太多了,挑几个写一下。

  • markReaderIndex()与resetReaderIndex():前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值。下面两段代码是等价的。

    java 复制代码
    // 代码片段一
    int readerIndex = buffer.readerIndex();
    // 其他操作
    buffer.readerIndex(readerIndex);
    // 代码片段二
    buffer.markReaderIndex();
    // 其他操作
    buffer.resetReaderIndex();

    这段代码出自《跟着闪电侠学Netty》,建议大家使用第二种,这样不用创建变量,buffer传递到哪里都可以调用resetReaderIndex()恢复之前的读位置

  • release()与retain():Netty使用的是堆外内存,通过引用计数的办法来管理内存,一般来说创建一个ByteBuf,引用为一,调用retain()就加一,调用release()引用减一,如果减完后为0就会回收底层使用的内存。

  • slice()、duplicate()、copy():这三个方法都会返回一个新的ByteBuf对象

    • slice(),从原始的ByteBuf中截取readerIndex ~ writerIndex数据,并且返回新的ByteBuf最大容量是原始ByteBufreadableBytes()。新的ByteBuf底层分配的内存、引用计数与原始ByteBuf共享,所以新的ByteBuf调用write系列的方法都会影响到原始ByteBuf数据。
    • duplicate():与slice()不同,它是将整个ByteBuf截取出来,包括数据、指针信息。返回新的ByteBuf底层分配内存、引用计数和原始ByteBuf共享。
    • copy():从原始ByteBuf中拷贝所有信息,相当于深拷贝,因此返回新的ByteBuf调用write系列方法不会影响原始ByteBuf

此外,slice()方法和duplicate()方法不会改变ByteBuf的引用计数,所以如果原始的ByteBuf引用计数为0后释放内存,这两个方法返回的新的ByteBuf也会被释放,这个时候就不能对其读写了,使用的时候需要注意这一点。

示例

下面用一段代码来演示ByteBuf的核心API,先创建一个打印的工具类方法:

java 复制代码
public static void printByteBuf(ByteBuf byteBuf, String action) {
        System.out.println("=============" + action + "=============");
        System.out.println("capacity = " + byteBuf.capacity());
        System.out.println("maxCapacity = " + byteBuf.maxCapacity());
        System.out.println("readerIndex = " + byteBuf.readerIndex());
        System.out.println("readableBytes = " + byteBuf.readableBytes());
        System.out.println("isReadable = " + byteBuf.isReadable());
        System.out.println("writerIndex = " + byteBuf.writerIndex());
        System.out.println("writableBytes = " + byteBuf.writableBytes());
        System.out.println("isWritable = " + byteBuf.isWritable());
        System.out.println();
}

接下来调用各个API。

java 复制代码
public static void doActionByteBuf() {
    // 申请一个初始容量为15,最大容量为20的ByteBuf
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(15, 20);
    printByteBuf(buffer, "申请一个初始容量为15,最大容量为20的ByteBuf");

    // 写入6个字节
    buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6});
    printByteBuf(buffer, "写入6个字节");

    // 读取四个字节
    buffer.readBytes(4);
    printByteBuf(buffer, "读取四个字节");

    // 再读取两个字节
    buffer.readBytes(new byte[2]);
    printByteBuf(buffer, "再读取两个字节");

    // 再次写入8个字节
    buffer.writeBytes(new byte[]{7, 8, 9, 10, 11, 12, 13, 14});
    printByteBuf(buffer, "再次写入8个字节");

    // getXxx()方法获取ByteBuf的值
    // getByte(index)获取缓冲区中指定索引处的字节。不会修改此缓冲区的readerIndex或writerIndex。
    System.out.println("getByte(3) = " + buffer.getByte(3));
    printByteBuf(buffer, "getByte(3)方法获取ByteBuf的值");

    // setXxx()方法修改ByteBuf的值,不会修改此缓冲区的readerIndex或writerIndex。
    buffer.setByte(8, 1);
    printByteBuf(buffer, "setByte(8, 1)修改ByteBuf的值");

    // slice
    ByteBuf sliceByte = buffer.slice();
    printByteBuf(sliceByte, "slice()方法");

    printByteBuf(buffer, "slice()方法后打印原始ByteBuf");

    // duplicate
    ByteBuf duplicateByte = buffer.duplicate();
    printByteBuf(duplicateByte, "duplicate()方法");

    printByteBuf(buffer, "duplicate()方法后打印原始ByteBuf");

    // 读取 sliceByte
    sliceByte.readInt();
    printByteBuf(sliceByte, "读取 sliceByte.readInt()");

    // 读取 duplicateByte
    duplicateByte.readInt();
    printByteBuf(duplicateByte, "读取 duplicateByte.readInt()");

    // 读取后的原始ByteBuf 这里运行结果表面 duplicateByte 和 sliceByte 的读取操作不会影响原始ByteBuf
    printByteBuf(buffer, "读取后的原始ByteBuf");

    // 写duplicateByte
    duplicateByte.writeBytes(new byte[]{1});
    printByteBuf(duplicateByte, "duplicateByte.writeBytes(new byte[1])");

    // 写完后原始ByteBuf
    byte[] bytes = new byte[8];
    printByteBuf(buffer, "写完后原始ByteBuf");
    buffer.readBytes(bytes);

    System.out.println("setByte(8,123) 之前,duplicateByte = " + duplicateByte.getByte(8));
    System.out.println("setByte(8,123) 之前,byteBuf = " + buffer.getByte(8));

    duplicateByte.setByte(8, 123);
    System.out.println("setByte(8,123) 之后,duplicateByte = " + duplicateByte.getByte(8));
    System.out.println("setByte(8,123) 之后,byteBuf = " + buffer.getByte(8));

    // 再写2个字节
    buffer.writeBytes(new byte[]{15, 16});
    printByteBuf(buffer, "写 2 个字节后");

    // 再写1个
    buffer.writeBytes(new byte[]{17});
    printByteBuf(buffer, "写 1 个字节后");

    // 再写5个触发异常
    //buffer.writeBytes(new byte[]{18, 19, 20, 21, 22});

    ByteBuf copyByte = buffer.copy();
    printByteBuf(copyByte, "copy()方法");

    copyByte.writeBytes(new byte[]{18});
    printByteBuf(copyByte, "copy()方法后写1个字节");

    printByteBuf(buffer, "copy()方法后写1个字节后原始ByteBuf");

    // 释放已废弃的空间
    buffer.discardReadBytes();
    printByteBuf(buffer, "释放已废弃的空间");
}

执行结果:

bash 复制代码
=============申请一个初始容量为15,最大容量为20的ByteBuf=============
capacity = 15
maxCapacity = 20
readerIndex = 0
readableBytes = 0
isReadable = false
writerIndex = 0
writableBytes = 15
isWritable = true

=============写入6个字节=============
capacity = 15
maxCapacity = 20
readerIndex = 0
readableBytes = 6
isReadable = true
writerIndex = 6
writableBytes = 9
isWritable = true

=============读取四个字节=============
capacity = 15
maxCapacity = 20
readerIndex = 4
readableBytes = 2
isReadable = true
writerIndex = 6
writableBytes = 9
isWritable = true

=============再读取两个字节=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 0
isReadable = false
writerIndex = 6
writableBytes = 9
isWritable = true

=============再次写入8个字节=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

getByte(3) = 4
=============getByte(3)方法获取ByteBuf的值=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============setByte(8, 1)修改ByteBuf的值=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============slice()方法=============
capacity = 8
maxCapacity = 8
readerIndex = 0
readableBytes = 8
isReadable = true
writerIndex = 8
writableBytes = 0
isWritable = false

=============slice()方法后打印原始ByteBuf=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============duplicate()方法=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============duplicate()方法后打印原始ByteBuf=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============读取 sliceByte.readInt()=============
capacity = 8
maxCapacity = 8
readerIndex = 4
readableBytes = 4
isReadable = true
writerIndex = 8
writableBytes = 0
isWritable = false

=============读取 duplicateByte.readInt()=============
capacity = 15
maxCapacity = 20
readerIndex = 10
readableBytes = 4
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============读取后的原始ByteBuf=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

=============duplicateByte.writeBytes(new byte[1])=============
capacity = 15
maxCapacity = 20
readerIndex = 10
readableBytes = 5
isReadable = true
writerIndex = 15
writableBytes = 0
isWritable = false

=============写完后原始ByteBuf=============
capacity = 15
maxCapacity = 20
readerIndex = 6
readableBytes = 8
isReadable = true
writerIndex = 14
writableBytes = 1
isWritable = true

setByte(8,123) 之前,duplicateByte = 1
setByte(8,123) 之前,byteBuf = 1
setByte(8,123) 之后,duplicateByte = 123
setByte(8,123) 之后,byteBuf = 123
=============写 2 个字节后=============
capacity = 20
maxCapacity = 20
readerIndex = 14
readableBytes = 2
isReadable = true
writerIndex = 16
writableBytes = 4
isWritable = true

=============写 1 个字节后=============
capacity = 20
maxCapacity = 20
readerIndex = 14
readableBytes = 3
isReadable = true
writerIndex = 17
writableBytes = 3
isWritable = true

=============copy()方法=============
capacity = 3
maxCapacity = 20
readerIndex = 0
readableBytes = 3
isReadable = true
writerIndex = 3
writableBytes = 0
isWritable = false

=============copy()方法后写1个字节=============
capacity = 20
maxCapacity = 20
readerIndex = 0
readableBytes = 4
isReadable = true
writerIndex = 4
writableBytes = 16
isWritable = true

=============copy()方法后写1个字节后原始ByteBuf=============
capacity = 20
maxCapacity = 20
readerIndex = 14
readableBytes = 3
isReadable = true
writerIndex = 17
writableBytes = 3
isWritable = true

=============释放已废弃的空间=============
capacity = 20
maxCapacity = 20
readerIndex = 0
readableBytes = 3
isReadable = true
writerIndex = 3
writableBytes = 17
isWritable = true


Process finished with exit code 0

这里图中画的不够全,最后结合执行结果分析各个核心API对ByteBuf的影响。

总结

ByteBuf是Netty为解决Java原始NIO缺陷自己实现的数据传输载体,具有动态扩容、读写指针分离、支持引用计数零拷贝等特点,此外在使用slice()、duplicate()时需要注意与原始的ByteBuf共享引用计数和底层内存,但是不共享读写指针。