HBase--对数据的增删查改操作

时间:2022-02-13 08:25:36

    之前学习了HBase中对表的创建查看删除操作,现在进一步学习对表里面的数据进行操作。首先是增删改,码上一段代码:

package com.yc.hadoop.hbase;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseAPI03 {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "slave01,slave02,slave03");
ExecutorService pool = Executors.newCachedThreadPool();
Connection con = ConnectionFactory.createConnection(conf, pool);
Admin admin = con.getAdmin(); //管理者

TableName tn = TableName.valueOf("student"); //表对象
if(admin.tableExists(tn)){
Table t = con.getTable(tn,pool);

//添加数据
Put put = new Put(Bytes.toBytes("gr")); //根据rowkey(行键)值创建添加数据对象
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("gr")).
addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("21")).
addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("女")).
addColumn(Bytes.toBytes("info"), Bytes.toBytes("tel"), Bytes.toBytes("12345678901"));
t.put(put); //写入数据是put
System.out.println("添加数据成功。。。。");

//修改数据
//修改与添加操作一样,区别是有数据(由rowkey,columnfamily,qualifier确定是否是有数据)时,
//再重复添加是修改
put = new Put(Bytes.toBytes("gr")).
addColumn(Bytes.toBytes("info"), Bytes.toBytes("tel"), Bytes.toBytes("12345678911"));
t.put(put); //修改数据可以看做是添加了数据
System.out.println("修改数据成功。。。。");

//删除数据
//根据rowkey值,创建删除数据对象
Delete del = new Delete(Bytes.toBytes("gr"));
del.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tel"));
t.delete(del);
System.out.println("删除数据成功。。。。");


}
admin.close();
con.close();
pool.shutdown();
}
}
这里 是简单的插入数据、修改数据、以及删除数据。我们可以看到当插入数据时结果为:

HBase--对数据的增删查改操作

我们修改tel之后的结果:

HBase--对数据的增删查改操作

我们将tel修改之后,再删除这条数据,查看结果:

HBase--对数据的增删查改操作

我们发现之前我们修改的数据不见了,变成了最开始插入的数据,这就是我们之前说的hbase是有多版本的,它可以在同一位置存放不同数据,而且我们删除并不是真的将数据删除了,而是将这条数据标记删除了。且从代码可以看出,我们的修改其实就是添加。

以上是对表的简单的增删改。接下来就是查:

package com.yc.hadoop.hbase;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseAPI04 {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "slave01,slave02,slave03");
ExecutorService pool = Executors.newCachedThreadPool();
Connection con = ConnectionFactory.createConnection(conf, pool);
Admin admin = con.getAdmin(); //管理者

TableName tn = TableName.valueOf("student");
if(admin.tableExists(tn)){
Table t = con.getTable(tn,pool);
//查询数据
Get get = new Get(Bytes.toBytes("gr")); //根据行键值,创建查询对象
get.addFamily(Bytes.toBytes("info"));

Result result = t.get(get); //返回一个结果集

List<Cell> cells = result.listCells();
if(cells != null && cells.size() != 0){
for (Cell cell : cells) {
String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); //CellUtil对单元格的操作的工具类
String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell)); //列族
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); //单元格修饰名
String value = Bytes.toString(CellUtil.cloneValue(cell)); //单元格的值
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss E");
String timeStr = sdf.format(new Date(cell.getTimestamp())); //时间戳
String str = String.format("行键:%s, 列族:%s, 单元格修饰名:%s, 单元格的值:%s, 时间戳:%s",
rowkey,columnFamily,qualifier,value,timeStr);
System.out.println(str);
}
}
}
admin.close();
con.close();
pool.shutdown();
}
}

结果为:

HBase--对数据的增删查改操作


我们通过GET方式得到我们的表数据,这里是根据行键创建查询对象,接着以查所有属于info列族的数据,可以根据自己的需要添加,若我们get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"));则是查询info列族下,列为sex的数据结果就是:


HBase--对数据的增删查改操作


在hbase中get是一种查询方式,我们还可以通过扫描scan进行查询,个人比较喜欢用这种方式。

public class HbaseAPI06 {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "slave01,slave02,slave03");
ExecutorService pool = Executors.newCachedThreadPool();
Connection con = ConnectionFactory.createConnection(conf, pool);
Admin admin = con.getAdmin(); //管理者

TableName tn = TableName.valueOf("student");
if(admin.tableExists(tn)){
Table t = con.getTable(tn,pool);
//查询数据
Scan scan = new Scan(); //创建表的扫描对象 全表扫描 不需要指定行键值
//scan.addFamily(Bytes.toBytes("score"));

scan.setCaching(10); //设置缓存个数

//80分以上的
//ValueFilter filter = new ValueFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("80")));
ValueFilter filter = new ValueFilter(CompareOp.GREATER_OR_EQUAL, new RegexStringComparator("[8-9]\\d"));
scan.setFilter(filter);


ResultScanner result = t.getScanner(scan);
for (Result r : result) { //有几个行键,就循环几次
List<Cell> cells = r.listCells(); //每一个行键只有一个List<Cell>
if(cells != null && cells.size() != 0){
for (Cell cell : cells) {
String rowkey = Bytes.toString(CellUtil.cloneRow(cell)); //CellUtil对单元格的操作的工具类
String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell)); //列族
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); //单元格修饰名
String value = Bytes.toString(CellUtil.cloneValue(cell)); //单元格的值
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss E");
String timeStr = sdf.format(new Date(cell.getTimestamp())); //时间戳
String str = String.format("行键:%s, 列族:%s, 单元格修饰名:%s, 单元格的值:%s, 时间戳:%s",
rowkey,columnFamily,qualifier,value,timeStr);
System.out.println(str);
}
System.out.println("===========================================");
}
}
}
admin.close();
con.close();
pool.shutdown();
}
}

这里我们通过Scan来扫描表,来查询数据,结果为:

HBase--对数据的增删查改操作

我们看到以行键来循环,一个行键下对应了一个List<Cell> 

根据需要我们可以把整个增删查改封装一个工具类,这样方便使用,以后可直接使用。

public class HBaseUtil {
private static Configuration conf;
private static Connection con;
private static Admin admin;
private static ExecutorService pool;

static{
conf = HBaseConfiguration.create();

//加载hbase.properties配置文件信息
ResourceBundle rb = ResourceBundles.getBundle("hbase");
Enumeration<String> kvs = rb.getKeys();
while(kvs.hasMoreElements()){
String key = kvs.nextElement();
String value = rb.getString(key);
conf.set(key, value);
}
/*pool = Executors.newCachedThreadPool();
con = ConnectionFactory.createConnection(conf, pool);
admin = con.getAdmin(); //管理者*/
}

public static Connection getConn(){
try {
pool = Executors.newCachedThreadPool();
con = ConnectionFactory.createConnection(conf, pool);
return con;
} catch (IOException e) {
throw new RuntimeException("数据库连接失败!",e);
}

}


public static Admin getAdmin(){
try {
getConn();
admin = con.getAdmin();
return admin;//管理者
} catch (IOException e) {
throw new RuntimeException("取得管理者失败!",e);
}
}


public static void close(){
try {
if(admin != null){
admin.close();
}
} catch (IOException e) {
throw new RuntimeException("关闭管理者失败!",e);
}
try {
if(con != null && admin == null){
con.close();
}
} catch (IOException e) {
throw new RuntimeException("关闭连接失败!",e);
}
try {
if(pool.isShutdown()){
pool.shutdown();
}
} catch (Exception e) {
throw new RuntimeException("关闭连接池失败!",e);
}

}

/**
* 创建表
* @param tableName 表名
* @param columnFamilies 列族
* @return true表示创建成功反之失败
*/
public static boolean createTable(String tableName, String...columnFamilies){
getAdmin();
TableName tn = TableName.valueOf(tableName);
try {
if(!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
for (String cf : columnFamilies) {
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
htd.addFamily(hcd);
}
admin.createTable(htd); //创建表
return true;
}else{
throw new TableExistsException("表已经存在!!!");
}
} catch (IOException e) {
throw new RuntimeException("表已经存在!");
}finally{
close();
}
}

/**
* 删除表操作
* @param tableName 表名
* @return true表示删除表成功,反之失败
*/
public static boolean deleteTable(String tableName){
getAdmin();
TableName delTN = TableName.valueOf(tableName);
try {
if(admin.tableExists(delTN)){
admin.disableTable(delTN); //使表失效
admin.deleteTable(delTN); //删除表
return true;
}else{
throw new RuntimeException("表不存在!");
}
} catch (IOException e) {
throw new RuntimeException("删除表失败!",e);
}finally{
close();
}
}


/**
* 修改表操作
* @param tableName 表名
* @param mt 操作类型
* @param rowkey 行键
* @param params 参数(列族、单元格修饰名、单元格的值)
*/
public static void doUpdate(String tableName, MutationType mt, String rowkey, String...params){
getAdmin();
TableName tn = TableName.valueOf(tableName);
try {
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch (mt) {
case PUT:
Put put = null;
if(params.length == 3){
put = new Put(Bytes.toBytes(rowkey)).
addColumn(Bytes.toBytes(params[0]), Bytes.toBytes(params[1]), Bytes.toBytes(params[2]));
}else{
throw new RuntimeException("参数必须为三个!");
}
t.put(put);
break;
case DELETE:
Delete del = new Delete(Bytes.toBytes(rowkey));
if(params != null){
switch (params.length) {
case 1:
del.addFamily(Bytes.toBytes(params[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(params[0]), Bytes.toBytes(params[1]));
break;
default:
throw new RuntimeException("最多两个参数");
}
}
t.delete(del);
break;
default:
throw new RuntimeException("只能进行增删改操作!");
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
close();
}
}


/**
* 多行修改
* @param tableName
* @param mt
* @param rowkey
* @param params 多列族
* String[]
*/
public static void doUpdate(String tableName, MutationType mt, String rowkey, String[]...params){
getAdmin();
TableName tn = TableName.valueOf(tableName);
try {
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch (mt) {
case PUT:
Put put = null;
put = new Put(Bytes.toBytes(rowkey));
for (String[] ps : params) {
if(ps.length == 3){
put.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]),Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数必须为三个!");
}
}
t.put(put);
break;
case DELETE:
Delete del = new Delete(Bytes.toBytes(rowkey));
for (String[] ps : params) {
if(ps != null){
switch (ps.length) {
case 1:
del.addFamily(Bytes.toBytes(ps[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多两个参数");
}
}
}
t.delete(del);
break;
default:
throw new RuntimeException("只能进行增删改操作!");
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
close();
}
}


/**
* 多表进行增删改操作
* @param tableName
* @param mt
* @param params 多行
* Map<String,List<String[]>>
*/
public static void doUpdate(String tableName, MutationType mt, Map<String,List<String[]>> params){
getAdmin();
TableName tn = TableName.valueOf(tableName);
try {
if(admin.tableExists(tn)){
Table t = con.getTable(tn, pool);
switch (mt) {
case PUT:
List<Put> puts = new ArrayList<Put>();
for (Entry<String, List<String[]>> entry : params.entrySet()) {
Put put = new Put(Bytes.toBytes(entry.getKey()));
for (String[] ps : entry.getValue()) {
if(ps.length == 3){
put.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]),Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数必须为三个!");
}
}
puts.add(put);
}
t.put(puts);
break;
case DELETE:
List<Delete> dels = new ArrayList<Delete>();
for (Entry<String, List<String[]>> entry : params.entrySet()) {
Delete del = new Delete(Bytes.toBytes(entry.getKey()));
if (entry.getValue() != null) {
for (String[] ps : entry.getValue()) {
if (ps != null && ps.length != 0) {
switch (ps.length) {
case 1:
del.addFamily(Bytes.toBytes(ps[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多两个参数");
}
}
}
}
dels.add(del);
}
t.delete(dels);
break;
default:
throw new RuntimeException("只能进行增删改操作!");
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
close();
}
}


/**
* 查找一条信息
* @param tableName
* @param rowkey
* @param columnFamily
* @param qualifiter
* @return
*/
public static String get(String tableName, String rowkey, String columnFamily, String qualifiter){
getAdmin();
try {
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifiter));
Result result = t.get(get);
List<Cell> cells = result.listCells();

return Bytes.toString(CellUtil.cloneValue(cells.get(0)));

} catch (IOException e) {
throw new RuntimeException("获得表对象失败!!!",e);
}
}


public static Map<String,String> get(String tableName, String rowkey, String columnFamily, String...qualifiters){
getAdmin();
try {
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
if(qualifiters != null && qualifiters.length != 0){
for (String qualifiter : qualifiters) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifiter));
}
}else if(columnFamily != null){
get.addFamily(Bytes.toBytes(columnFamily));
}

Result result = t.get(get);
List<Cell> cells = result.listCells();
Map<String,String> results = null;
if(cells != null && cells.size() != 0){
results = new HashMap<String, String>();
for (Cell cell : cells) {
results.put(Bytes.toString(CellUtil.cloneQualifier(cell)),
Bytes.toString(CellUtil.cloneValue(cell)));
}
}
return results;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败!!!",e);
}

}

/**
* 查询操作
* @param tableName 表名
* @param rowkey 行键
* @param columnFamily 列族名
* @param clazz
* @return
*/
public static <T> T get(String tableName, String rowkey, String columnFamily, Class<T> clazz){
getAdmin();
try {
Table t = con.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
Field[] fs = clazz.getDeclaredFields();

for (Field f : fs) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(f.getName()));
}
Result result = t.get(get);
List<Cell> cells = result.listCells();
T tObj = clazz.newInstance();
if(cells != null && cells.size() != 0){
for (Cell cell : cells) {
for (int i = 0; i < fs.length; i++) {
String valueStr = Bytes.toString(CellUtil.cloneQualifier(cell));
if(Bytes.toString(CellUtil.cloneQualifier(cell)).intern() == fs[i].getName().intern()){
Object value = null;
if(fs[i].getType().getName() == "int" || fs[i].getType().getName().intern() == "java.lang.Integer"){
value = Integer.parseInt(valueStr);
}else if(fs[i].getType().getName() == "double" || fs[i].getType().getName().intern() == "java.lang.Double"){
value = Double.parseDouble(valueStr);
}
fs[i].setAccessible(true);
fs[i].set(tObj, value);
}
}
}
}
return tObj;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败!!!",e);
}catch (Exception e) {
throw new RuntimeException("创建对象失败!!!",e);
}

}

/**
* 扫描表查询
* @param tableName 表名
* @param params 参数
* @return
*
* Map<String,Map<String,String>> String:rowkey; Map<String,String> 列族+单元格
*/
public static Map<String,Map<String,String>> scan(String tableName, String[]...params){
getAdmin();
try {
Table t = con.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();

if(params != null && params.length != 0){
for (String[] param : params) {
if (param != null && param.length != 0) {
switch (param.length) {
case 1:
scan.addFamily(Bytes.toBytes(param[0]));
break;
case 2:
scan.addColumn(Bytes.toBytes(param[0]), Bytes.toBytes(param[1]));
break;
default:
throw new RuntimeException("参数只能是1个或2个!!!");
}
}
}
}

ResultScanner rs = t.getScanner(scan); //多行记录

Map<String,Map<String,String>> info = new HashMap<String,Map<String,String>>();
for (Result r : rs) {
List<Cell> cells = r.listCells();
Map<String,String> results = null;
if(cells != null && cells.size() != 0){
results = new HashMap<String, String>();
for (Cell cell : cells) {
results.put(Bytes.toString(CellUtil.cloneFamily(cell))+ ":" +Bytes.toString(CellUtil.cloneQualifier(cell)),
Bytes.toString(CellUtil.cloneValue(cell)));
}
}
info.put(Bytes.toString(r.getRow()), results);
}

return info;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败!!!",e);
}catch (Exception e) {
throw new RuntimeException("创建对象失败!!!",e);
}

}

}