HBase + Solr Cloud实现HBase二级索引

时间:2023-01-02 14:33:09

1. 执行流程

HBase + Solr Cloud实现HBase二级索引

2. Solr Cloud实现

http://blog.csdn.net/u011462328/article/details/53008344

3. HBase实现

1) 自定义Observer

① 代码

  1. package cn.bfire.coprocessor;
  2. import com.typesafe.config.Config;
  3. import com.typesafe.config.ConfigFactory;
  4. import org.apache.hadoop.hbase.Cell;
  5. import org.apache.hadoop.hbase.CellUtil;
  6. import org.apache.hadoop.hbase.client.Delete;
  7. import org.apache.hadoop.hbase.client.Durability;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
  10. import org.apache.hadoop.hbase.coprocessor.ObserverContext;
  11. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  12. import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
  13. import org.apache.hadoop.hbase.util.Bytes;
  14. import org.apache.solr.common.SolrInputDocument;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import java.io.IOException;
  18. import java.util.List;
  19. /**
  20. * 为hbase提供二级索引的协处理器 Coprocesser
  21. */
  22. public class UserDevPiSolrObserver extends BaseRegionObserver {
  23. //加载配置文件属性
  24. static Config config = ConfigFactory.load("userdev_pi_solr.properties");
  25. //log记录
  26. private static final Logger logger = LoggerFactory.getLogger(UserDevPiSolrObserver.class);
  27. @Override
  28. public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
  29. // 获取行键值
  30. String rowkey = Bytes.toString(put.getRow());
  31. //实例化 SolrDoc
  32. SolrInputDocument doc = new SolrInputDocument();
  33. //添加Solr uniqueKey值
  34. doc.addField("rowkey", rowkey);
  35. // 获取需要索引的列
  36. String[] hbase_columns = config.getString("hbase_column").split(",");
  37. // 获取需要索引的列的值并将其添加到SolrDoc
  38. for (int i = 0; i < hbase_columns.length; i++) {
  39. String colName = hbase_columns[i];
  40. String colValue = "";
  41. // 获取指定列
  42. List<Cell> cells = put.get("cf".getBytes(), colName.getBytes());
  43. if (cells != null) {
  44. try {
  45. colValue = Bytes.toString(CellUtil.cloneValue(cells.get(0)));
  46. } catch (Exception ex) {
  47. logger.error("添加solrdoc错误", ex);
  48. }
  49. }
  50. doc.addField(colName, colValue);
  51. }
  52. //发送数据到本地缓存
  53. SolrIndexTools.addDoc(doc);
  54. }
  55. @Override
  56. public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
  57. //得到rowkey
  58. String rowkey = Bytes.toString(delete.getRow());
  59. //发送数据本地缓存
  60. String solr_collection = config.getString("solr_collection");
  61. SolrIndexTools.delDoc(rowkey);
  62. }
  63. }
  1. package cn.bfire.coprocessor;
  2. import com.typesafe.config.Config;
  3. import com.typesafe.config.ConfigFactory;
  4. import org.apache.solr.client.solrj.impl.CloudSolrClient;
  5. import org.apache.solr.common.SolrInputDocument;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. import java.util.Timer;
  11. import java.util.TimerTask;
  12. import java.util.concurrent.Semaphore;
  13. /**
  14. * solr索引处理客户端
  15. * 注意问题,并发提交时,需要线程协作资源
  16. */
  17. public class SolrIndexTools {
  18. //加载配置文件属性
  19. static Config config = ConfigFactory.load("userdev_pi_solr.properties");
  20. //log记录
  21. private static final Logger logger = LoggerFactory.getLogger(SolrIndexTools.class);
  22. //实例化solr的client
  23. static CloudSolrClient client = null;
  24. //添加批处理阈值
  25. static int add_batchCount = config.getInt("add_batchCount");
  26. //删除的批处理阈值
  27. static int del_batchCount = config.getInt("del_batchCount");
  28. //添加的集合缓冲
  29. static List<SolrInputDocument> add_docs = new ArrayList<SolrInputDocument>();
  30. //删除的集合缓冲
  31. static List<String> del_docs = new ArrayList<String>();
  32. static final List<String> zkHosts = new ArrayList<String>();
  33. static {
  34. logger.info("初始化索引调度........");
  35. String zk_host = config.getString("zk_host");
  36. String[] data = zk_host.split(",");
  37. for (String zkHost : data) {
  38. zkHosts.add(zkHost);
  39. }
  40. client = new CloudSolrClient.Builder().withZkHost(zkHosts).build();
  41. // 获取Solr collection
  42. String solr_collection = config.getString("solr_collection");
  43. client.setDefaultCollection(solr_collection);
  44. client.setZkClientTimeout(10000);
  45. client.setZkConnectTimeout(10000);
  46. //启动定时任务,第一次延迟1s执行,之后每隔指定时间30S执行一次
  47. Timer timer = new Timer();
  48. timer.schedule(new SolrCommit(), config.getInt("first_delay") * 1000, config.getInt("interval_commit_index") * 1000);
  49. }
  50. public static class SolrCommit extends TimerTask {
  51. @Override
  52. public void run() {
  53. logger.info("索引线程运行中........");
  54. //只有等于true时才执行下面的提交代码
  55. try {
  56. semp.acquire();//获取信号量
  57. if (add_docs.size() > 0) {
  58. client.add(add_docs);//添加
  59. }
  60. if (del_docs.size() > 0) {
  61. client.deleteById(del_docs);//删除
  62. }
  63. //确保都有数据才提交
  64. if (add_docs.size() > 0 || del_docs.size() > 0) {
  65. client.commit();//共用一个提交策略
  66. //清空缓冲区的添加和删除数据
  67. add_docs.clear();
  68. del_docs.clear();
  69. } else {
  70. logger.info("暂无索引数据,跳过commit,继续监听......");
  71. }
  72. } catch (Exception e) {
  73. logger.error("间隔提交索引数据出错!", e);
  74. } finally {
  75. semp.release();//释放信号量
  76. }
  77. }
  78. }
  79. /**
  80. * 添加数据到临时存储中,如果
  81. * 大于等于batchCount时,就提交一次,
  82. * 再清空集合,其他情况下走对应的时间间隔提交
  83. *
  84. * @param doc 单个document对象
  85. */
  86. public static void addDoc(SolrInputDocument doc) {
  87. commitIndex(add_docs, add_batchCount, doc, true);
  88. }
  89. /***
  90. * 删除的数据添加到临时存储中,如果大于
  91. * 对应的批处理就直接提交,再清空集合,
  92. * 其他情况下走对应的时间间隔提交
  93. *
  94. * @param rowkey 删除的rowkey
  95. */
  96. public static void delDoc(String rowkey) {
  97. commitIndex(del_docs, del_batchCount, rowkey, false);
  98. }
  99. // 任何时候,保证只能有一个线程在提交索引,并清空集合
  100. final static Semaphore semp = new Semaphore(1);
  101. /***
  102. * 此方法需要加锁,并且提交索引时,与时间间隔提交是互斥的
  103. * 百分百确保不会丢失数据
  104. *
  105. * @param datas 用来提交的数据集合
  106. * @param count 对应的集合提交数量
  107. * @param doc   添加的单个doc
  108. * @param isAdd 是否为添加动作
  109. */
  110. public synchronized static void commitIndex(List datas, int count, Object doc, boolean isAdd) {
  111. try {
  112. semp.acquire();//获取信号量
  113. if (datas.size() >= count) {
  114. if (isAdd) {
  115. client.add(datas);//添加数据到服务端中
  116. } else {
  117. client.deleteById(datas);//删除数据
  118. }
  119. client.commit();//提交数据
  120. datas.clear();//清空临时集合
  121. }
  122. } catch (Exception e) {
  123. e.printStackTrace();
  124. logger.error("按阈值" + (isAdd == true ? "添加" : "删除") + "操作索引数据出错!", e);
  125. } finally {
  126. datas.add(doc);//添加单条数据
  127. semp.release();//释放信号量
  128. }
  129. }
  130. }
    1. <pre code_snippet_id="1962705" snippet_file_name="blog_20161102_1_8333418" style="font-family: Consolas; font-size: 11.3pt; background-color: rgb(255, 255, 255);">pom文件配置</pre>
    2. <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_4_7977704" name="code" class="html"><?xml version="1.0" encoding="UTF-8"?>
    3. <project xmlns="http://maven.apache.org/POM/4.0.0"
    4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    5. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    6. <modelVersion>4.0.0</modelVersion>
    7. <groupId>cn.gcks</groupId>
    8. <artifactId>hbase</artifactId>
    9. <version>1.0-SNAPSHOT</version>
    10. <dependencies>
    11. <!-- https://mvnrepository.com/artifact/org.apache.solr/solr-solrj -->
    12. <dependency>
    13. <groupId>org.apache.solr</groupId>
    14. <artifactId>solr-solrj</artifactId>
    15. <version>6.2.1</version>
    16. <exclusions>
    17. <exclusion>
    18. <groupId>org.slf4j</groupId>
    19. <artifactId>slf4j-api</artifactId>
    20. </exclusion>
    21. </exclusions>
    22. </dependency>
    23. <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
    24. <dependency>
    25. <groupId>org.apache.hbase</groupId>
    26. <artifactId>hbase-client</artifactId>
    27. <version>1.1.2</version>
    28. <exclusions>
    29. <exclusion>
    30. <groupId>org.apache.hadoop</groupId>
    31. <artifactId>*</artifactId>
    32. </exclusion>
    33. </exclusions>
    34. </dependency>
    35. <dependency>
    36. <groupId>org.apache.hbase</groupId>
    37. <artifactId>hbase-server</artifactId>
    38. <version>1.1.2</version>
    39. <exclusions>
    40. <exclusion>
    41. <groupId>org.apache.hadoop</groupId>
    42. <artifactId>*</artifactId>
    43. </exclusion>
    44. </exclusions>
    45. </dependency>
    46. <!-- https://mvnrepository.com/artifact/com.typesafe/config -->
    47. <dependency>
    48. <groupId>com.typesafe</groupId>
    49. <artifactId>config</artifactId>
    50. <version>1.3.1</version>
    51. </dependency>
    52. </dependencies>
    53. </project></pre></pre>
    54. <pre style="font-family:Consolas; font-size:11.3pt; background-color:rgb(255,255,255)"><p>
    55. </p><p><span style="font-weight:bold; color:rgb(0,128,0); font-size:11.3pt; background-color:rgb(228,228,255)">userdev_pi_solr.properties</span></p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161227_5_7563783" name="code" class="plain">#需要建索引的列
    56. hbase_column=oid,pi_id,statdate
    57. # solr的collection名称
    58. solr_collection=userdev_pi_day
    59. #定义solr的url地址,如果是cloud模式,可以配置多个以逗号分隔
    60. zk_host=1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181
    61. #调度第一次开始时,延迟多少秒执行
    62. first_delay=10
    63. #后台线程多久提交一次索引,单位秒
    64. interval_commit_index=30
    65. #添加索引的批处理数量
    66. add_batchCount=10000
    67. #删除索引的批处理数量
    68. del_batchCount=2000</pre><br><br><p></p><p></p><p>② 打包代码并上传到<span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">目录</span></p><p>③ 修改<span style="font-family:Calibri">HBase</span><span style="font-family:宋体">表(设置自定义</span><span style="font-family:Calibri">observer</span><span style="font-family:宋体">所在</span><span style="font-family:Calibri">hdfs</span><span style="font-family:宋体">位置,以及指定自定义</span><span style="font-family:Calibri">Observer</span><span style="font-family:宋体">全类名)</span></p><p>alter 'radius:raduserlog', 'coprocessor' => 'hdfs://<span style="color:rgb(0,112,192)">/apps/hbase/jars/hbase_solr.jar</span>|cn.bfire.coprocessor.UserDevPiSolrObserver|'</p><p>2) 数据查询代码</p><p></p><pre code_snippet_id="1962705" snippet_file_name="blog_20161102_4_5934630" name="code" class="java">package cn.bfire.solr;
    69. import org.apache.commons.logging.Log;
    70. import org.apache.commons.logging.LogFactory;
    71. import org.apache.hadoop.hbase.Cell;
    72. import org.apache.hadoop.hbase.CellUtil;
    73. import org.apache.hadoop.hbase.HBaseConfiguration;
    74. import org.apache.hadoop.hbase.TableName;
    75. import org.apache.hadoop.hbase.client.*;
    76. import org.apache.hadoop.hbase.util.Bytes;
    77. import org.apache.solr.client.solrj.SolrQuery;
    78. import org.apache.solr.client.solrj.impl.CloudSolrClient;
    79. import org.apache.solr.client.solrj.response.QueryResponse;
    80. import org.apache.solr.common.SolrDocument;
    81. import org.apache.solr.common.SolrDocumentList;
    82. import org.apache.solr.common.SolrInputDocument;
    83. import java.util.ArrayList;
    84. import java.util.Collection;
    85. import java.util.List;
    86. public class SolrCloudTest {
    87. public static final Log LOG = LogFactory.getLog(SolrCloudTest.class);
    88. private static CloudSolrClient cloudSolrClient;
    89. private static Connection connection;
    90. private static Table table;
    91. private static Get get;
    92. private static String defaultCollection = "userdev_pi_day";
    93. private static String hbaseTable = "<span style="font-family: Arial, Helvetica, sans-serif;">userdev_pi_day</span><span style="font-family: Arial, Helvetica, sans-serif;">";</span>
    94. List<Get> list = new ArrayList<Get>();
    95. static {
    96. final List<String> zkHosts = new ArrayList<String>();
    97. zkHosts.add("1.1.1.1:2181");
    98. zkHosts.add("1.1.1.2:2181");
    99. zkHosts.add("1.1.1.3:2181");
    100. cloudSolrClient = new CloudSolrClient.Builder().withZkHost(zkHosts).build();
    101. final int zkClientTimeout = 10000;
    102. final int zkConnectTimeout = 10000;
    103. cloudSolrClient.setDefaultCollection(defaultCollection);
    104. cloudSolrClient.setZkClientTimeout(zkClientTimeout);
    105. cloudSolrClient.setZkConnectTimeout(zkConnectTimeout);
    106. try {
    107. connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
    108. table = connection.getTable(TableName.valueOf(hbaseTable));
    109. } catch (Exception e) {
    110. e.printStackTrace();
    111. }
    112. }
    113. private void addIndex(CloudSolrClient cloudSolrClient) throws Exception {
    114. Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
    115. for (int i = 0; i <= 100; i++) {
    116. SolrInputDocument doc = new SolrInputDocument();
    117. String key = "";
    118. key = String.valueOf(i);
    119. doc.addField("rowkey", key);
    120. doc.addField("usermac", key + "usermac");
    121. doc.addField("userid", key + "userid");
    122. doc.addField("usertype", key + "usertype");
    123. doc.addField("city_id", key + "city_id");
    124. docs.add(doc);
    125. }
    126. LOG.info("docs info:" + docs + "\n");
    127. cloudSolrClient.add(docs);
    128. cloudSolrClient.commit();
    129. }
    130. public void search(CloudSolrClient cloudSolrClient, String Str) throws Exception {
    131. SolrQuery query = new SolrQuery();
    132. query.setRows(100);
    133. query.setQuery(Str);
    134. LOG.info("query string: " + Str);
    135. QueryResponse response = cloudSolrClient.query(query);
    136. SolrDocumentList docs = response.getResults();
    137. System.out.println("文档个数:" + docs.getNumFound()); //数据总条数也可轻易获取
    138. System.out.println("查询时间:" + response.getQTime());
    139. System.out.println("查询总时间:" + response.getElapsedTime());
    140. for (SolrDocument doc : docs) {
    141. String rowkey = (String) doc.getFieldValue("rowkey");
    142. get = new Get(Bytes.toBytes(rowkey));
    143. list.add(get);
    144. }
    145. Result[] res = table.get(list);
    146. for (Result rs : res) {
    147. Cell[] cells = rs.rawCells();
    148. for (Cell cell : cells) {
    149. System.out.println("============");
    150. System.out.println(new String(CellUtil.cloneRow(cell)));
    151. System.out.println(new String(CellUtil.cloneFamily(cell)));
    152. System.out.println(new String(CellUtil.cloneQualifier(cell)));
    153. System.out.println(new String(CellUtil.cloneValue(cell)));
    154. System.out.println("============");
    155. break;
    156. }
    157. }
    158. table.close();
    159. }
    160. public static void main(String[] args) throws Exception {
    161. cloudSolrClient.connect();
    162. SolrCloudTest solrt = new SolrCloudTest();
    163. //            solrt.addIndex(cloudSolrClient);
    164. solrt.search(cloudSolrClient, "userid:11111");
    165. cloudSolrClient.close();
    166. }
    167. }
    168. </pre><br><br><p></p><p></p><pre></pre><pre></pre></pre>
    169. <pre></pre>
    170. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=2.0">

HBase + Solr Cloud实现HBase二级索引的更多相关文章

  1. 使用ElasticSearch赋能HBase二级索引 &vert; 实践一年后总结

    前言:还记得那是2018年的一个夏天,天气特别热,我一边擦汗一边听领导大刀阔斧的讲述自己未来的改革蓝图.会议开完了,核心思想就是:我们要搞一个数据大池子,要把公司能灌的数据都灌入这个大池子,然后让别人 ...

  2. 「从零单排HBase 12」HBase二级索引Phoenix使用与最佳实践

    Phoenix是构建在HBase上的一个SQL层,能让我们用标准的JDBC APIs对HBase数据进行增删改查,构建二级索引.当然,开源产品嘛,自然需要注意“避坑”啦,阿丸会把使用方式和最佳实践都告 ...

  3. 基于Solr实现HBase的二级索引

    文章来源:http://www.open-open.com/lib/view/open1421501717312.html 实现目的: 由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想 ...

  4. hbase基于solr配置二级索引

    一.概述 Hbase适用于大表的存储,通过单一的RowKey查询虽然能快速查询,但是对于复杂查询,尤其分页.查询总数等,实现方案浪费计算资源,所以可以针对hbase数据创建二级索引(Hbase Sec ...

  5. CDH使用Solr实现HBase二级索引

      一.为什么要使用Solr做二级索引二.实时查询方案三.部署流程3.1 安装HBase.Solr3.2 增加HBase复制功能3.3创建相应的 SolrCloud 集合3.4 创建 Lily HBa ...

  6. HBase协处理器同步二级索引到Solr&lpar;续&rpar;

    一. 已知的问题和不足二.解决思路三.代码3.1 读取config文件内容3.2 封装SolrServer的获取方式3.3 编写提交数据到Solr的代码3.4 拦截HBase的Put和Delete操作 ...

  7. HBase协处理器同步二级索引到Solr

    一. 背景二. 什么是HBase的协处理器三. HBase协处理器同步数据到Solr四. 添加协处理器五. 测试六. 协处理器动态加载 一. 背景 在实际生产中,HBase往往不能满足多维度分析,我们 ...

  8. Lily HBase Indexer同步HBase二级索引到Solr丢失数据的问题分析

    一.问题描述二.分析步骤2.1 查看日志2.2 修改Solr的硬提交2.3 寻求*帮助2.4 修改了read-row="never"后,丢失部分字段2.5 ...

  9. HBase协处理器的使用&lpar;添加Solr二级索引&rpar;

    给HBase添加一二级索引,HBase协处理器结合solr 代码如下 package com.hbase.coprocessor; import java.io.IOException; import ...

随机推荐

  1. nginx安装及配置为简单的文件服务器

    centos 6.5 直接yum安装即可 yum install nginx -y 配置文件位于:/etc/nginx/nginx.conf,里面可以修改处理器数量.日志路径.pid文件路径等,默认的 ...

  2. Irrlicht 鬼火

    1.下载引擎 2.引入头文件 在VS2010下新建项目,项目->属性->配置属性->VC++目录 在包含目录中:添加 引擎安装目录\include\ 在库目录中:添加 引擎安装目录\ ...

  3. CPU informition

    tar jxvf util-linux-ng-2.18.bz2cd util-linux-ng-2.18/./configure --enable-arch --enable-partx --enab ...

  4. 编写who命令:文件操作,缓冲区与联机帮助

    最近阅读UULP(Understanding Unix/Linux Programming),按照书中介绍对Unix/Linux系统编程进行学习梳理,总结如下. 1. who命令能做什么 who命令用 ...

  5. 微信开发第3章 通过accesstoken获取用户分组

    上一章我们获取到了access_token,那么我们可以试着拿token获取用户粉丝分组,调用接口地址为: http请求方式: GET(请使用https协议) https://api.weixin.q ...

  6. Java字符串之String与StringBuilder

    String与SringBuiler的一些比较   在Java中,我们会大量使用字符串,但是String究竟是怎样工作的我们可能没有想过太多,其实在String类中,每一个看起来会修改String值的 ...

  7. JS中this到底指向谁?

    关于this的指向,是一个令人很头疼的问题.但是,你运气好,碰到了我.老夫这儿有本祖传秘籍,看懂这个,妈妈再也不用担心你的this指向不对啦! 归根结底,this指向就一句话:谁最终调用函数,this ...

  8. Eclipse 扩展activiti-desinger 安装

    activiti-desinger 工作流画图工具分为在线安装.离线安装两种方式:下图提供当前所用eclipse版本信息 1.1        在线安装 打开Eclipse -> Help -& ...

  9. 基于 IJKPlayer-concat 协议的视频无缝拼接技术实现

    一.前言 Hi,大家好,我是承香墨影! 开门见山,开篇名义.今天来聊聊如何将多段视频,拼接成一个完整而连续的视频,然后无缝进行播放. 这样的需求应该不算偏门吧? 最简单的就是一些视频 App,会将大段 ...

  10. Android应用程序类型和进程状态

    来自<Android4高级编程> Android应用程序不能控制自己的生命周期,应用程序组件(Activity.Service等其他组件)必须监听应用程序状态的变化并做出适当的反应,而且特 ...