MapReduce-读取文件写入HBase

时间:2021-07-18 21:49:02

MapReduce直接写入HBase

代码如下

package com.hbase.mapreduce;

import java.io.IOException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
* @author:FengZhen
* @create:2018年9月14日
*/
public class ImportFromFile extends Configured implements Tool{ private static String addr="HDP233,HDP232,HDP231";
private static String port="2181";
public static final String NAME = "ImportFromFile";
public enum Counters { LINES } static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private byte[] family = null;
private byte[] qualifier = null; @Override
protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String column = context.getConfiguration().get("conf.column");
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey[0];
if (colkey.length > 1) {
qualifier = colkey[1];
}
} @Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
try {
String lineString = value.toString();
//行键是经过MD5散列之后随机生成的键值
byte[] rowkey = DigestUtils.md5(lineString);
Put put = new Put(rowkey);
//存储原始数据到给定的表中的一列
put.addColumn(family, qualifier, Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey), put);
context.getCounter(Counters.LINES).increment(1L);
} catch (Exception e) {
e.printStackTrace();
}
}
} /**
* 使用Apache Commons CLI类解析命令行参数。
* @param args
* @return
*/
private static CommandLine parseArgs(String[] args) {
Options options = new Options();
Option option = new Option("t", "table", true, "table to import into -must exist");
option.setArgName("table-name");
option.setRequired(true);
options.addOption(option); option = new Option("c", "column", true, "column to store row data into -must exit");
option.setArgName("family:qualifier");
option.setRequired(true);
options.addOption(option); option = new Option("i", "input", true, "the directory or file to read from");
option.setArgName("path-in-HDFS");
option.setRequired(true);
options.addOption(option); options.addOption("d", "debug", false, "switch on DEBUG log level"); CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
e.printStackTrace();
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ", options, true);
System.exit(1);
}
return cmd;
} public int run(String[] arg0) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum",addr);
configuration.set("hbase.zookeeper.property.clientPort", port);
//String[] otherArgs = new GenericOptionsParser(configuration, arg0).getRemainingArgs();
//CommandLine commandLine = parseArgs(arg0); // String table = commandLine.getOptionValue("t");
// String input = commandLine.getOptionValue("i");
// String column = commandLine.getOptionValue("c"); String table = arg0[0];
String input = arg0[1];
String column = arg0[2];
configuration.set("conf.column", column); Job job = Job.getInstance(configuration);
job.setJobName("ImportFromFile");
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
//这是一个只包含map阶段的作业,框架会直接跳过reduce阶段
job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input));
return job.waitForCompletion(true) ? 0 : 1;
} public static void main(String[] args) throws Exception {
String[] params = new String[] {"test_table_mr", "hdfs://fz/data/fz/input/hbase", "data:info"};
int exitCode = ToolRunner.run(new ImportFromFile(), params);
System.exit(exitCode);
}
}

MapReduce-读取文件写入HBase的更多相关文章

  1. Mapreduce的文件和hbase共同输入

    Mapreduce的文件和hbase共同输入 package duogemap;   import java.io.IOException;   import org.apache.hadoop.co ...

  2. MapReduce和Spark写入Hbase多表总结

    作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 大家都知道用mapreduce或者spark写入已知的hbase中的表时,直接在mapreduc ...

  3. shell读取文件写入新文件

    #!/bin/sh #系统简称 SYST="HVPS" #发送行号 SEND1234SEND=" #接收行号 RECV1234RECV=" cd /home/w ...

  4. python小练习之读取文件写入excel

    文件是个json文件 内容为: 导入excel后的格式为 屡一下思路 一步步怎么实现: 1 首先需要读取json文件 然后将读取的内容转为字典 2 将excel的列名写入一个list中 然后遍历执行写 ...

  5. Python学习笔记五&lpar;读取提取写入文件&rpar;

    #Python打开读取一个文件内容,然后写入一个新的文件中,并对某些字段进行提取,写入新的字段的脚本,与大家共同学习. import os import re def get_filelist(dir ...

  6. 【HBase】HBase与MapReduce集成——从HDFS的文件读取数据到HBase

    目录 需求 步骤 一.创建maven工程,导入jar包 二.开发MapReduce程序 三.结果 需求 将HDFS路径 /hbase/input/user.txt 文件的内容读取并写入到HBase 表 ...

  7. 使用MapReduce读取HBase数据存储到MySQL

    Mapper读取HBase数据 package MapReduce; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hba ...

  8. 用mapreduce读取hdfs数据到hbase上

    hdfs数据到hbase过程 将HDFS上的文件中的数据导入到hbase中 实现上面的需求也有两种办法,一种是自定义mr,一种是使用hbase提供好的import工具 hbase先创建好表   cre ...

  9. MapReduce-从HBase读取数据处理后再写入HBase

    MapReduce-从HBase读取处理后再写入HBase 代码如下 package com.hbase.mapreduce; import java.io.IOException; import o ...

随机推荐

  1. 基本C语言滤波算法

    11种软件滤波方法的示例程序 假定从8位AD中读取数据(如果是更高位的AD可定义数据类型为int),子程序为get_ad(); 1.限副滤波 /*  A值可根据实际情况调整 value为有效值,new ...

  2. jQuery LigerUI V1&period;2&period;3 &lpar;包括API和全部源码&rpar; 发布

    前言 这次版本主要是增加了Panel和Portal组件,并增加了一套皮肤,并解决了部分兼容性的问题,添加了几个功能点. 欢迎使用反馈. 相关链接 API:         http://api.lig ...

  3. Winform快速开发组件的实现(一)

    好久好久没有露面了,呵呵,对于写文章都有点生疏了. 在拿到任何一个项目,不管是b/s的还是c/s,我不会立即开始写代码,我一般会为使这些项目能够快速开发制定一系列的支持组件,虽然可能前期会付出一些代价 ...

  4. C&num;中Byte转换相关的函数

    1.将一个对象转换为byte对象 public static byte GetByte(object o) { ; if (o != null) { byte tmp; if (byte.TryPar ...

  5. 关于viewWithTag的一点说明

    通常我们使用viewWithTag如下情形: 如果我们用了一个父View,上面放了多个子view, 每个子view都通过从0开始的Tag值来进行标志,以便于后期在像View上直接使用viewWithT ...

  6. 关于struts2中的default-action-ref

    struts2中的default-action-ref一般用于,在请求无效或错误时将请求指引到错误页面.我这次的用法是在请求首页之前先发送请求到后台,进行数据获取后再转至首页显示,但是出了一个问题,d ...

  7. Redis的KEYS命令引起宕机事件

    摘要: 使用 Redis 的开发者必看,吸取教训啊! 原文:Redis 的 KEYS 命令引起 RDS 数据库雪崩,RDS 发生两次宕机,造成几百万的资金损失 作者:陈浩翔 Fundebug经授权转载 ...

  8. synchronized 和reentrantlock的优缺点

    reentrantlock的优点 可以添加多个检控条件, 如果使用synchronized,则只能使用一个. 使用 reentrant locks 可以有多个wait()/notify() 队列. [ ...

  9. centos中selinux功能及常用服务配置

    SELinux: Secure Enhenced Linux 常用命令 获取selinux的当前状态: # getenforce 临时启用或禁用: # setenfoce 0|1 永久性启用,需要修改 ...

  10. General error&colon; 24374 OCIStmtFetch&colon; ORA-24374&colon; define not done before fetch or execute and fetch

    问题 $sql='insert into "test"("id") values(4)'; $res=$this->conn->query($sql ...