问题:
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录。
1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
1 |
a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int );
|
3 |
c = foreach b {c1= order a by value desc ; c2=limit c1 2; generate group ,c2.value;};
|
4 |
d = stream c through `sed 's/[(){}]//g' `;
|
结果:
这几行代码其实也实现了mysql中的 group_concat 函数的功能:
1 |
a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int );
|
3 |
c = foreach b {c1= order a by value desc ; generate group ,c1.value;};
|
4 |
d = stream c through `sed 's/[(){}]//g' `;
|
结果:
2、下面我们再来看看hive如何处理group topk的问题:
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
01 |
package com.example.hive.udf; |
02 |
import org.apache.hadoop.hive.ql. exec .UDF;
|
04 |
public final class Rank extends UDF{
|
06 |
private String last_key;
|
07 |
public int evaluate(final String key ){
|
08 |
if ( ! key .equalsIgnoreCase(this.last_key) ) {
|
12 |
return this.counter++;
|
(2)注册jar、建表、导数据,查询:
2 |
create temporary function rank as 'com.example.hive.udf.Rank' ;
|
3 |
create table city(id int ,cname string,value int ) row format delimited fields terminated by ' ' ;
|
4 |
LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
|
5 |
select cname, value from (
|
6 |
select cname,rank(cname) csum,value from (
|
7 |
select id, cname, value from city distribute by cname sort by cname,value desc
|
(3)结果:
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现
http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
01 |
import java.io.IOException;
|
02 |
import java.util.TreeSet;
|
04 |
import org.apache.hadoop.conf.Configuration;
|
05 |
import org.apache.hadoop.fs.Path;
|
06 |
import org.apache.hadoop.io.IntWritable;
|
07 |
import org.apache.hadoop.io.LongWritable;
|
08 |
import org.apache.hadoop.io.Text;
|
09 |
import org.apache.hadoop.mapreduce.Job;
|
10 |
import org.apache.hadoop.mapreduce.Mapper;
|
11 |
import org.apache.hadoop.mapreduce.Reducer;
|
12 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
13 |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
14 |
import org.apache.hadoop.util.GenericOptionsParser;
|
16 |
public class GroupTopK {
|
17 |
// 这个 MR 将会取得每组年龄中 id 最大的前 3 个
|
18 |
// 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
|
19 |
public static class GroupTopKMapper extends
|
20 |
Mapper<LongWritable, Text, IntWritable, LongWritable> {
|
21 |
IntWritable outKey = new IntWritable();
|
22 |
LongWritable outValue = new LongWritable();
|
23 |
String[] valArr = null ;
|
25 |
public void map(LongWritable key, Text value, Context context)
|
26 |
throws IOException, InterruptedException {
|
27 |
valArr = value.toString().split( "\t" );
|
28 |
outKey.set(Integer.parseInt(valArr[ 2 ])); // age int
|
29 |
outValue.set(Long.parseLong(valArr[ 0 ])); // id long
|
30 |
context.write(outKey, outValue);
|
34 |
public static class GroupTopKReducer extends
|
35 |
Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
|
37 |
LongWritable outValue = new LongWritable();
|
39 |
public void reduce(IntWritable key, Iterable<LongWritable> values,
|
40 |
Context context) throws IOException, InterruptedException {
|
41 |
TreeSet<Long> idTreeSet = new TreeSet<Long>();
|
42 |
for (LongWritable val : values) {
|
43 |
idTreeSet.add(val.get());
|
44 |
if (idTreeSet.size() > 3 ) {
|
45 |
idTreeSet.remove(idTreeSet.first());
|
48 |
for (Long id : idTreeSet) {
|
50 |
context.write(key, outValue);
|
55 |
public static void main(String[] args) throws Exception {
|
56 |
Configuration conf = new Configuration();
|
57 |
String[] otherArgs = new GenericOptionsParser(conf, args)
|
60 |
System.out.println(otherArgs.length);
|
61 |
System.out.println(otherArgs[ 0 ]);
|
62 |
System.out.println(otherArgs[ 1 ]);
|
64 |
if (otherArgs.length != 3 ) {
|
65 |
System.err.println( "Usage: GroupTopK <in> <out>" );
|
68 |
Job job = new Job(conf, "GroupTopK" );
|
69 |
job.setJarByClass(GroupTopK. class );
|
70 |
job.setMapperClass(GroupTopKMapper. class );
|
71 |
job.setReducerClass(GroupTopKReducer. class );
|
72 |
job.setNumReduceTasks( 1 );
|
73 |
job.setOutputKeyClass(IntWritable. class );
|
74 |
job.setOutputValueClass(LongWritable. class );
|
75 |
FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ]));
|
76 |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ]));
|
77 |
System.exit(job.waitForCompletion( true ) ? 0 : 1 );
|
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到结果没有问题。
注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更*,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113