public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc;
//jdk ByteBuffer管理直接内存
private ByteBuffer buffer;
private int capacity;
@Override
public boolean hasArray() {
return false;
}
@Override
public int nioBufferCount() {
return 1;
} //设置ByteBuffer,替换的话释放ByteBuffer
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
//freeDirect(oldBuffer);
//调用PlatformDependent工具处理
PlatformDependent.freeDirectBuffer(buffer);
}
} this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
@Override
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
} int readerIndex = readerIndex();
int writerIndex = writerIndex(); int oldCapacity = capacity;
//扩容处理
if (newCapacity > oldCapacity) {
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
//将整个oldBuffer复制到newBuffer
oldBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.put(oldBuffer);
//clear是重置容器坐标,不是清理数据
newBuffer.clear();
setByteBuffer(newBuffer);
} else if (newCapacity < oldCapacity) {
//缩容处理跟heap一样
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
oldBuffer.position(readerIndex).limit(writerIndex);
newBuffer.position(readerIndex).limit(writerIndex);
newBuffer.put(oldBuffer);
newBuffer.clear();
} else {
setIndex(newCapacity, newCapacity);
}
setByteBuffer(newBuffer);
}
return this;
} @Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.capacity());
if (src.nioBufferCount() > 0) {
for (ByteBuffer bb: src.nioBuffers(srcIndex, length)) {
int bbLen = bb.remaining();
setBytes(index, bb);
index += bbLen;
}
} else {
src.getBytes(srcIndex, this, index, length);
}
return this;
} @Override
public ByteBuf setBytes(int index, ByteBuffer src) {
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
if (src == tmpBuf) {
src = src.duplicate();
}
//tmpBuf 是指向 this.buffer 通过put(src)api指加src里的内容
//指加之前要设置position开始坐标同结束边界limit
tmpBuf.clear().position(index).limit(index + src.remaining());
tmpBuf.put(src);
return this;
} @Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
//duplicate 是创建一个共享的ByteBuffer,slice也是创建共享,只不过是bufferr其中一部分
//创建新的ByteBuffer区别是内部坐标记录指向新的ByteBuffer对象,内容是共享的
return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice();
} //处理逻辑跟setBytes(int index, ByteBuf src, int srcIndex, int length) 一样,对换src dst即可
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.capacity());
if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else if (dst.nioBufferCount() > 0) {
for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) {
int bbLen = bb.remaining();
getBytes(index, bb);
index += bbLen;
}
} else {
dst.setBytes(dstIndex, this, index, length);
}
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
getBytes(index, dst, false);
return this;
}
//处理逻辑跟setBytes(int index, ByteBuffer src) 一样,对换src dst即可
private void getBytes(int index, ByteBuffer dst, boolean internal) {
checkIndex(index, dst.remaining()); ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = buffer.duplicate();
} tmpBuf.clear().position(index).limit(index + dst.remaining());
dst.put(tmpBuf);
}
}
小结:
1.setBytes getBtyes逻辑基本是相同,只需要对换src dst
2.当判断是否src.nioBufferCount() 分支时其实写死也行,只不过为以后兼容其它类型ByteBuf