【文件属性】:
文件名称:HBase全局一致性事务支持Tephra.zip
文件大小:603KB
文件格式:ZIP
更新时间:2022-08-07 21:08:06
开源项目
Tephra 在 Apache HBase 的基础上提供了全局一致性的事务支持。HBase 提供了强一致性的基于行和区域的 ACID 操作支持,但是牺牲了在跨区域操作的支持。这就要求应用开发者花很大力气来确保区域边界上操作的一致性。而 Tephra 提供了全局事务支持,可以夸区域、跨表以及多个 RPC 上简化了应用的开发。示例代码: /**
* A Transactional SecondaryIndexTable.
*/
public class SecondaryIndexTable {
private byte[] secondaryIndex;
private TransactionAwareHTable transactionAwareHTable;
private TransactionAwareHTable secondaryIndexTable;
private TransactionContext transactionContext;
private final TableName secondaryIndexTableName;
private static final byte[] secondaryIndexFamily =
Bytes.toBytes("secondaryIndexFamily");
private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
private static final byte[] DELIMITER = new byte[] {0};
public SecondaryIndexTable(TransactionServiceClient transactionServiceClient,
HTable hTable, byte[] secondaryIndex) {
secondaryIndexTableName =
TableName.valueOf(hTable.getName().getNameAsString() ".idx");
HTable secondaryIndexHTable = null;
HBaseAdmin hBaseAdmin = null;
try {
hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
}
secondaryIndexHTable = new HTable(hTable.getConfiguration(),
secondaryIndexTableName);
} catch (Exception e) {
Throwables.propagate(e);
} finally {
try {
hBaseAdmin.close();
} catch (Exception e) {
Throwables.propagate(e);
}
}
this.secondaryIndex = secondaryIndex;
this.transactionAwareHTable = new TransactionAwareHTable(hTable);
this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
this.transactionContext = new TransactionContext(transactionServiceClient,
transactionAwareHTable,
secondaryIndexTable);
}
public Result get(Get get) throws IOException {
return get(Collections.singletonList(get))[0];
}
public Result[] get(List
gets) throws IOException {
try {
transactionContext.start();
Result[] result = transactionAwareHTable.get(gets);
transactionContext.finish();
return result;
} catch (Exception e) {
try {
transactionContext.abort();
} catch (TransactionFailureException e1) {
throw new IOException("Could not rollback transaction", e1);
}
}
return null;
}
public Result[] getByIndex(byte[] value) throws IOException {
try {
transactionContext.start();
Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
ArrayList gets = new ArrayList();
for (Result result : indexScanner) {
for (Cell cell : result.listCells()) {
gets.add(new Get(cell.getValue()));
}
}
Result[] results = transactionAwareHTable.get(gets);
transactionContext.finish();
return results;
} catch (Exception e) {
try {
transactionContext.abort();
} catch (TransactionFailureException e1) {
throw new IOException("Could not rollback transaction", e1);
}
}
return null;
}
public void put(Put put) throws IOException {
put(Collections.singletonList(put));
}
public void put(List puts) throws IOException {
try {
transactionContext.start();
ArrayList secondaryIndexPuts = new ArrayList();
for (Put put : puts) {
List indexPuts = new ArrayList();
Set>> familyMap = put.getFamilyMap().entrySet();
for (Map.Entry> family : familyMap) {
for (KeyValue value : family.getValue()) {
if (value.getQualifier().equals(secondaryIndex)) {
byte[] secondaryRow = Bytes.add(value.getQualifier(),
DELIMITER,
Bytes.add(value.getValue(),
DELIMITER,
value.getRow()));
Put indexPut = new Put(secondaryRow);
indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
indexPuts.add(indexPut);
}
}
}
secondaryIndexPuts.addAll(indexPuts);
}
transactionAwareHTable.put(puts);
secondaryIndexTable.put(secondaryIndexPuts);
transactionContext.finish();
} catch (Exception e) {
try {
transactionContext.abort();
} catch (TransactionFailureException e1) {
throw new IOException("Could not rollback transaction", e1);
}
}
}
}
标签:Tephra