Hbase0.96 MVCC Lock 知识梳理

时间:2023-11-24 17:25:20

HBASE0.96 MVCC

写入的时候

每个Region包含一个Memstore,维护一个MultiVersionConsistencyControl对象

 w = mvcc.beginMemstoreInsert();
...
addedSize += applyFamilyMapToMemstore(familyMaps[i], w);

这里beginMemstoreInsert其实是生成一个带WriteNumber的WriteEntry,与之对应的是completeMemstoreInsert(WriteEntry e),当调用了这个方法之后WriteNumber这个才可见

public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {
long nextWriteNumber = ++memstoreWrite;
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}

applyFamilyMapToMemstore方法

  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
long size = 0;
boolean freemvcc = false; try {
if (localizedWriteEntry == null) {
localizedWriteEntry = mvcc.beginMemstoreInsert();
freemvcc = true;
} for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue(); Store store = getStore(family);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
size += store.add(kv);
}
}
} finally {
if (freemvcc) {
mvcc.completeMemstoreInsert(localizedWriteEntry);
}
} return size;
}

这里每个kv都是带版本号的

KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMvccVersion(localizedWriteEntry.getWriteNumber());

completeMemstoreInsert更新可读位置的版本号memstoreRead,唤醒readWaiters.notifyAll();

public void completeMemstoreInsert(WriteEntry e) {
advanceMemstore(e);
waitForRead(e);
}

advanceMemstore(e);

  boolean advanceMemstore(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted(); long nextReadValue = -1;
boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst(); if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
} if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
} if (!ranOnce) {
throw new RuntimeException("never was a first");
} if (nextReadValue > 0) {
synchronized (readWaiters) {
memstoreRead = nextReadValue;
readWaiters.notifyAll();
}
}
if (memstoreRead >= e.getWriteNumber()) {
return true;
}
return false;
}
}

waitForRead(e);

public void waitForRead(WriteEntry e) {
boolean interrupted = false;
synchronized (readWaiters) {
while (memstoreRead < e.getWriteNumber()) {
try {
readWaiters.wait(0);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
}
}
}
if (interrupted) Thread.currentThread().interrupt();
}