Phoenix简介概述,Phoenix的Java API 相关操作优秀案例

时间:2022-06-11 11:40:05


相关参考优秀博文

segmentfault:xirong:使用Phoenix通过sql语句更新操作hbase数据



一、Phoenix概述简介

Phoeinx = HBase + SQL

Phoenix核心能力:

  • SQL引擎层

    –支持标准SQL 92,转为SQL为HBase API

    – 算子、过滤条件下推到Server端,并行执行

    – 轻量级事务、二级索引,动态列、分页查询等多种SQL层能力
  • JDBCDriver
  • Metadata管理
  • 集成Spark、Hive、Pig、Flume和MapReduce

Phoenix(云 HBase SQL)核心功能原理及应用场景介绍

瑾谦——Phoenix相关博文

jy02268879——Phoenix相关博文

二、Phoenix实例一:Java API操作

2.1 phoenix.properties

phoenix.driver=org.apache.phoenix.jdbc.PhoenixDriver
phoenix.url=jdbc:phoenix:node1:2181
phoenix.user=root
phoenix.password=密码

2.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.sid.hbase</groupId>
<artifactId>hbase-train</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<hbase.version>1.4.4</hbase.version>
<hadoop.version>2.9.0</hadoop.version>
</properties> <dependencies> <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-HBase-1.4</version>
</dependency> </dependencies>
</project>

2.3 configutils.java

package com.sid.hbase;

import java.io.IOException;
import java.util.Properties; public class ConfigUtils {
public static Properties p =new Properties(); static {
try{
p.load(ClassLoader.getSystemResourceAsStream("phoenix.properties"));
}catch (Exception e){
e.printStackTrace();
}
} public static String getDriver() throws IOException {
p.load(ClassLoader.getSystemResourceAsStream("phoenix.properties"));
return p.getProperty("phoenix.driver");
} public static String getUrl(){
return p.getProperty("phoenix.url");
} public static String getUserName(){
return p.getProperty("phoenix.user");
} public static String getPassWord(){
return p.getProperty("phoenix.password");
} public static void main(String[] args) throws IOException { System.out.println(getDriver());
System.out.println(getUrl());
System.out.println(getUserName());
System.out.println(getPassWord());
}
}

2.4 PhoenixTest.java

package com.sid.hbase;

import org.junit.After;
import org.junit.Before;
import org.junit.Test; import java.sql.*; public class PhoenixTest {
private Connection conn;
private Statement stat;
private ResultSet rs; @Before
public void initResource() throws Exception{
Class.forName(ConfigUtils.getDriver());
conn = DriverManager.getConnection(ConfigUtils.getUrl(), ConfigUtils.getUserName(), ConfigUtils.getPassWord());
stat = conn.createStatement(); } @Test
public void testCreateTable() throws SQLException {
String sql="create table test_phoenix_api(mykey integer not null primary key ,mycolumn varchar )";
stat.executeUpdate(sql);
conn.commit();
} @Test
public void upsert() throws SQLException {
String sql1="upsert into test_phoenix_api values(1,'test1')";
String sql2="upsert into test_phoenix_api values(2,'test2')";
String sql3="upsert into test_phoenix_api values(3,'test3')";
stat.executeUpdate(sql1);
stat.executeUpdate(sql2);
stat.executeUpdate(sql3);
conn.commit();
} @Test
public void delete() throws SQLException {
String sql1="delete from test_phoenix_api where mykey = 1";
stat.executeUpdate(sql1);
conn.commit();
} @After
public void closeResource() throws SQLException {
if(rs!=null){
rs.close();
}
if(stat!=null){
stat.close();
}
if(conn!=null){
conn.close();
}
}
}

分别运行创建表、插入数据、删除数据后在phoenix中查看结果

0: jdbc:phoenix:node1,node2,node3,node4> !table
+------------+--------------+-------------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-----+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BUCKETS | MUL |
+------------+--------------+-------------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-----+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null | fal |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null | fal |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 | fal |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null | fal |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null | fal |
| | | TEST_PHOENIX_API | TABLE | | | | | | false | null | fal |
| | | US_POPULATION | TABLE | | | | | | false | null | fal |
| | | wc | VIEW | | | | | | false | null | fal |
+------------+--------------+-------------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-----+
0: jdbc:phoenix:node1,node2,node3,node4> select * from test_phoenix_api
. . . . . . . . . . . . . . . . . . . .> ;
+--------+-----------+
| MYKEY | MYCOLUMN |
+--------+-----------+
| 1 | test1 |
| 2 | test2 |
| 3 | test3 |
+--------+-----------+
3 rows selected (0.325 seconds)
0: jdbc:phoenix:node1,node2,node3,node4> select * from test_phoenix_api;
+--------+-----------+
| MYKEY | MYCOLUMN |
+--------+-----------+
| 2 | test2 |
| 3 | test3 |
+--------+-----------+
2 rows selected (0.14 seconds)
0: jdbc:phoenix:node1,node2,node3,node4>

三、Phoenix实例二:Java API操作

3.1 pom.xml中加入依赖

<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.13.1-HBase-1.3</version>
<exclusions>
<exclusion>
<groupId>com.salesforce.i18n</groupId>
<artifactId>i18n-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

3.2 JDBC

import java.sql.*;
public class MyJDBC {
private static String driverClassName;
private static String URL;
private static String username;
private static String password;
private static boolean autoCommit; /** 声明一个 Connection类型的静态属性,用来缓存一个已经存在的连接对象 */
private static Connection conn;
static {
config();
}
/** * 开头配置自己的数据库信息 */
private static void config() {
/* * 获取驱动 */
driverClassName = "org.apache.phoenix.jdbc.PhoenixDriver";
/* * 获取URL */
URL = "jdbc:phoenix:xx,xx,xx:2181";
/* * 获取用户名 */
username = "";
/* * 获取密码 */
password = "";
/* * 设置是否自动提交,一般为false不用改 */
autoCommit = true;
}
/** * 载入数据库驱动类 */
private static boolean load() {
try {
Class.forName(driverClassName);
return true;
} catch (ClassNotFoundException e) {
System.out.println("驱动类 " + driverClassName + " 加载失败");
}
return false;
} /** * 建立数据库连接 */
public static Connection connect() {
/* 加载驱动 */
load();
try {
/* 建立连接 */
conn = DriverManager.getConnection(URL, username, password);
} catch (SQLException e) {
System.out.println("建立数据库连接失败 , " + e.getMessage());
}
return conn;
} /** * 设置是否自动提交事务 **/
public static void transaction() {
try {
conn.setAutoCommit(autoCommit);
} catch (SQLException e) {
System.out.println("设置事务的提交方式为 : " + (autoCommit ? "自动提交" : "手动提交") + " 时失败: " + e.getMessage());
}
} /** * 创建 Statement 对象 */
public static Statement statement() {
Statement st = null;
connect();
/* 如果连接是无效的就重新连接 */
transaction();
/* 设置事务的提交方式 */
try {
st = conn.createStatement();
} catch (SQLException e) {
System.out.println("创建 Statement 对象失败: " + e.getMessage());
}
return st;
} /** * 根据给定的带参数占位符的SQL语句,创建 PreparedStatement 对象 *
* @param SQL 带参数占位符的SQL语句
* @return 返回相应的 PreparedStatement 对象
*/
private static PreparedStatement prepare(String SQL, boolean autoGeneratedKeys) {
PreparedStatement ps = null;
connect();
/* 如果连接是无效的就重新连接 */
transaction();
/* 设置事务的提交方式 */
try {
if (autoGeneratedKeys) {
ps = conn.prepareStatement(SQL, Statement.RETURN_GENERATED_KEYS);
} else {
ps = conn.prepareStatement(SQL);
}
} catch (SQLException e) {
System.out.println("创建 PreparedStatement 对象失败: " + e.getMessage());
}
return ps;
} public static ResultSet query(String SQL, Object... params) {
if (SQL == null || !SQL.trim().toLowerCase().startsWith("select")) {
throw new RuntimeException("你的SQL语句为空或不是查询语句");
}
ResultSet rs = null;
if (params.length > 0) {
/* 说明 有参数 传入,就需要处理参数 */
PreparedStatement ps = prepare(SQL, false);
try {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
rs = ps.executeQuery();
} catch (SQLException e) {
System.out.println("执行SQL失败: " + e.getMessage());
}
} else {
/* 说明没有传入任何参数 */
Statement st = statement();
try {
rs = st.executeQuery(SQL); // 直接执行不带参数的 SQL 语句
} catch (SQLException e) {
System.out.println("执行SQL失败: " + e.getMessage());
}
}
return rs;
} /** * 释放资源 * **/
public static void release(Object cloaseable) {
if (cloaseable != null) {
if (cloaseable instanceof ResultSet) {
ResultSet rs = (ResultSet) cloaseable;
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (cloaseable instanceof Statement) {
Statement st = (Statement) cloaseable;
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (cloaseable instanceof Connection) {
Connection c = (Connection) cloaseable;
try {
c.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}

3.3 serviceImpl

Connection connect = MyJDBC.connect();
String sql = "select CARNO,LINECODE,DATETIME,WLNG,WLAT,DISTANCE from ZY_PATROL_LOG where CARNO = ? and DATETIME >= ? and DATETIME <=? order by CARNO,LINECODE,DATETIME asc";
ResultSet resultSet = MyJDBC.query(sql, carNoByIn, startTime, endTime);
while (resultSet.next()) {
String carNo = resultSet.getString(1);
String lineCode = resultSet.getString(2);
String x = resultSet.getString("WLNG");
String y = resultSet.getString("WLAT");
String key = carNo + "-" + lineCode;
List<String> xys = map.get(key);
if (xys == null) {
xys = new ArrayList<>();
}
xys.add(x + "," + y);
map.put(key, xys);
}
resultSet.close();
connect.close();

四、Phoenix常用函数列举及说明

函数 说明
AVG ( numericTerm ) 计算平均值
COUNT (Term) 计算term的总数
COUNT (DISTINCT Term) 去重求总数
MAX ( term ) 求最大值
MIN ( term ) 求最小值
SUM ( numericTerm ) 字段内容求和
ROUND(number, 2) 截取2位有效数字
TO_NUMBER(‘123’) 字符串转数字,数字较大会转成科学计数法
SUBSTR(‘Hello World’, -5) 截取前五个字符
LENGTH(‘Hello’) 获取字符串的长度
UPPER(‘Hello’) 将字符串中所有字母大写
LOWER(‘HELLO’) 将字符串中所有字母小写
REVERSE(‘Hello’) 将字符串中所有字母顺序翻转
TO_CHAR(myDate, ‘2001-02-03 04:05:06’) 将时间类型字段转换成字符串结构的字符串
TO_DATE(‘Sat, 3 Feb 2001 03:05:06 GMT’, ‘EEE, d MMM yyyy HH:mm:ss z’) 将字符串转换成时间类型的内容,结构按照传入结构
CURRENT_DATE() 获取当前机器时间(在客户端中为日期)
CURRENT_TIME() 获取当前机器时间(在客户端中为时间)

三、使用Phoenix通过sql语句更新操作hbase数据

3.1Shell命令

建表语句

新建一张Person表,含有IDCardNum,Name,Age 三个字段 ,test 为table_schem ,标准sql如下:

create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER);

在 Phoenix 中使用如下:

0: jdbc:phoenix:10.35.66.72> create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER);
No rows affected (0.344 seconds)
0: jdbc:phoenix:10.35.66.72> !tables
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TA |
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+
| null | WL | BIG_LOG_DEVUTRACEID_INDEX | INDEX |
| null | WL | MSGCENTER_PUSHMESSAGE_V2_OWNERPAGE_INDEX | INDEX |
| null | SYSTEM | CATALOG | SYSTEM TABLE |
| null | SYSTEM | SEQUENCE | SYSTEM TABLE |
| null | SYSTEM | STATS | SYSTEM TABLE |
| null | DMO | SOWNTOWN_STATICS | TABLE |
| null | OL | BIGLOG | TABLE |
| null | TEST | PERSON | TABLE |
| null | WL | BIG_LOG | TABLE |
| null | WL | ERROR_LOG | TABLE |
| null | WL | MSGCENTER_PUSHMESSAGE | TABLE |
| null | WL | MSGCENTER_PUSHMESSAGE_V2 | TABLE |
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+
0: jdbc:phoenix:10.35.66.72> select * from TEST.PERSON;
+------------------------------------------+----------------------+------------------------------------------+
| IDCARDNUM | NAME | AGE |
+------------------------------------------+----------------------+------------------------------------------+
+------------------------------------------+----------------------+------------------------------------------+

插入操作

对表进行插入操作,sql如下:

insert into Person (IDCardNum,Name,Age) values (100,'小明',12);
insert into Person (IDCardNum,Name,Age) values (101,'小红',15);
insert into Person (IDCardNum,Name,Age) values (103,'小王',22);

在 Phoenix 中插入的语句为 upsert ,具体如下:

0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age) values (100,'小明',12);
1 row affected (0.043 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age) values (101,'小红',15);
1 row affected (0.018 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age) values (103,'小王',22);
1 row affected (0.009 seconds)
0: jdbc:phoenix:10.35.66.72> select * from test.Person;
+------------------------------------------+----------------------+------------------------------------------+
| IDCARDNUM | NAME | AGE |
+------------------------------------------+----------------------+------------------------------------------+
| 100 | 小明 | 12 |
| 101 | 小红 | 15 |
| 103 | 小王 | 22 |
+------------------------------------------+----------------------+------------------------------------------+
3 rows selected (0.115 seconds)

修改表中数据

alter 修改表数据,sql如下:

ALTER TABLE test.Persion ADD sex varchar(10);

Phoenix 中操作如下:

0: jdbc:phoenix:10.35.66.72> ALTER TABLE test.Person ADD sex varchar(10);
No rows affected (0.191 seconds)
: jdbc:phoenix:10.35.66.72> select * from test.person;
+------------------------------------------+----------------------+------------------------------------------+------------+
| IDCARDNUM | NAME | AGE | SEX |
+------------------------------------------+----------------------+------------------------------------------+------------+
| 100 | 小明 | 12 | null |
| 101 | 小红 | 15 | null |
| 103 | 小王 | 22 | null |
+------------------------------------------+----------------------+------------------------------------------+------------+
3 rows selected (0.113 seconds)

更新表中数据

更新表数据 ,标准的sql 如下:

update test.Person set sex='男' where IDCardNum=100;
update test.Person set sex='女' where IDCardNum=101;
update test.Person set sex='男' where IDCardNum=103;

Phoenix中不存在update的语法关键字,而是upsert ,功能上替代了Insert+update,

0: jdbc:phoenix:10.35.66.72> upsert into test.person (idcardnum,sex) values (100,'男');
1 row affected (0.083 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.person (idcardnum,sex) values (101,'女');
1 row affected (0.012 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.person (idcardnum,sex) values (103,'男');
1 row affected (0.008 seconds)
0: jdbc:phoenix:10.35.66.72> select * from test.person;
+------------------------------------------+----------------------+------------------------------------------+------------+
| IDCARDNUM | NAME | AGE | SEX |
+------------------------------------------+----------------------+------------------------------------------+------------+
| 100 | 小明 | 12 | 男 |
| 101 | 小红 | 15 | 女 |
| 103 | 小王 | 22 | 男 |
+------------------------------------------+----------------------+------------------------------------------+------------+
3 rows selected (0.087 seconds)

复杂查询

复杂查询,通过Phoenix可以支持 where、group by、case when 等复杂的查询条件,案例如下:

# 现增加几条数据
0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age,sex) values (104,'小张',23,'男');
1 row affected (0.012 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age,sex) values (105,'小李',28,'男');
1 row affected (0.015 seconds)
0: jdbc:phoenix:10.35.66.72> upsert into test.Person (IDCardNum,Name,Age,sex) values (106,'小李',33,'男');
1 row affected (0.011 seconds)
0: jdbc:phoenix:10.35.66.72> select * from test.person;
+------------------------------------------+----------------------+------------------------------------------+------------+
| IDCARDNUM | NAME | AGE | SEX |
+------------------------------------------+----------------------+------------------------------------------+------------+
| 100 | 小明 | 12 | 男 |
| 101 | 小红 | 15 | 女 |
| 103 | 小王 | 22 | 男 |
| 104 | 小张 | 23 | 男 |
| 105 | 小李 | 28 | 男 |
| 106 | 小李 | 33 | 男 |
+------------------------------------------+----------------------+------------------------------------------+------------+
6 rows selected (0.09 seconds)

where + group by 语句

jdbc:phoenix:10.35.66.72> select sex ,count(sex) as num from test.person where age >20 group by sex;
+------------+------------------------------------------+
| SEX | NUM |
+------------+------------------------------------------+
| 男 | 4 |
+------------+------------------------------------------+

case when

0: jdbc:phoenix:10.35.66.72> select (case name when '小明' then '明明啊' when '小红' then '红红啊' else name end) as showname from test.person;
+------------------------------------------+
| SHOWNAME |
+------------------------------------------+
| 明明啊 |
| 红红啊 |
| 小王 |
| 小张 |
| 小李 |
| 小李 |
+------------------------------------------+

删除数据及删除表

删除数据及删除表,标准sql如下:

delete from test.Person where idcardnum=100;
drop table test.person;

Phoenix中同标准sql一样,案例如下:

0: jdbc:phoenix:10.35.66.72> delete from test.Person where idcardnum=100;
1 row affected (0.072 seconds)
0: jdbc:phoenix:10.35.66.72> select * from test.Person where idcardnum=100;
+------------------------------------------+----------------------+------------------------------------------+------------+
| IDCARDNUM | NAME | AGE | SEX |
+------------------------------------------+----------------------+------------------------------------------+------------+
+------------------------------------------+----------------------+------------------------------------------+------------+ 0: jdbc:phoenix:10.35.66.72> drop table test.person;
No rows affected (1.799 seconds)
0: jdbc:phoenix:10.35.66.72> select * from test.person;
Error: ERROR 1012 (42M03): Table undefined. tableName=TEST.PERSON (state=42M03,code=1012)
0: jdbc:phoenix:10.35.66.72> !tables
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TA |
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+
| null | WL | BIG_LOG_DEVUTRACEID_INDEX | INDEX |
| null | WL | MSGCENTER_PUSHMESSAGE_V2_OWNERPAGE_INDEX | INDEX |
| null | SYSTEM | CATALOG | SYSTEM TABLE |
| null | SYSTEM | SEQUENCE | SYSTEM TABLE |
| null | SYSTEM | STATS | SYSTEM TABLE |
| null | DMO | SOWNTOWN_STATICS | TABLE |
| null | OL | BIGLOG | TABLE |
| null | WL | BIG_LOG | TABLE |
| null | WL | ERROR_LOG | TABLE |
| null | WL | MSGCENTER_PUSHMESSAGE | TABLE |
| null | WL | MSGCENTER_PUSHMESSAGE_V2 | TABLE |
+------------------------------------------+------------------------------------------+------------------------------------------+-------------------+

3.2java client api 使用

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.PreparedStatement;
import java.sql.Statement; public class test { public static void main(String[] args) throws SQLException {
Statement stmt = null;
ResultSet rset = null; Connection con = DriverManager.getConnection("jdbc:phoenix:[zookeeper]");
stmt = con.createStatement(); stmt.executeUpdate("create table test (mykey integer not null primary key, mycolumn varchar)");
stmt.executeUpdate("upsert into test values (1,'Hello')");
stmt.executeUpdate("upsert into test values (2,'World!')");
con.commit(); PreparedStatement statement = con.prepareStatement("select * from test");
rset = statement.executeQuery();
while (rset.next()) {
System.out.println(rset.getString("mycolumn"));
}
statement.close();
con.close();
}
}