MapReduce小文件优化与分区

时间:2023-02-18 21:58:37

一、小文件优化

1.Mapper类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /**
* 思路?
* wordcount单词计数
* <单词,1>
*
* 数据传输
*
* KEYIN:数据的起始偏移量0~10 11~20 21~30
* VALUEIN:数据
*
* KEYOUT:mapper输出到reduce阶段 k的类型
* VALUEOUT:mapper输出到reduce阶段v的类型
* <China,1><Beijing,1><love,1>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//key 起始偏移量 value 数据 context 上下文
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1.读取数据
String line = value.toString();
// 2.切割 love Beijing
String[] words = line.split(" ");
// 3.循环的写到下一个阶段<love,1><Beijing,1>
for (String w : words) {
// 4.输出到reducer阶段
context.write(new Text(w), new IntWritable(1));
}
}
}

2.Reducer类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 统计单词出现的次数
int sum = 0;
// 累加求和
for (IntWritable count : values) {
// 拿到值累加
sum += count.get();
}
// 结果输出
context.write(key, new IntWritable(sum));
}
}

3.Driver类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 获取jar包
job.setJarByClass(WordCountDriver.class);
// 获取自定义的mapper与reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定运行的inputformat方式 默认的方式是textinputformat(小文件优化)
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M
CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M
// 设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/in1024/"));
FileOutputFormat.setOutputPath(job, new Path("c:/out1024/"));
// 提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}

二、分区

1.Mapper类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /**
* 3631279850362 13726130503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 www.itstaredu.com 教育网站 24 27 299 681 200
*
* 13726130503 299 681 980
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1.获取数据
String line = value.toString(); // 2.切割
String[] fields = line.split("\t"); // 3.封装对象 拿到关键字段 数据清洗
String phoneN = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long dfFlow = Long.parseLong(fields[fields.length - 2]); // 4.输出到reduce端
context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
}
}

2.Reducer类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
// 1.相同手机号 的流量使用再次汇总
long upFlow_sum = 0;
long dfFlow_sum = 0; // 2.累加
for (FlowBean f : values) {
upFlow_sum += f.getUpFlow();
dfFlow_sum += f.getDfFlow();
}
FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);
// 3.输出
context.write(key, rs);
}
}

3.封装类

package com.css.flow.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable; /**
* 封装类 数据的传输
*/
public class FlowBean implements Writable{
// 定义属性
private long upFlow;
private long dfFlow;
private long flowSum; public FlowBean() {
} // 流量累加
public FlowBean(long upFlow, long dfFlow) {
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowSum = upFlow + dfFlow;
} // 反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowSum = in.readLong();
} // 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowSum);
} @Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowSum;
} public long getUpFlow() {
return upFlow;
} public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
} public long getDfFlow() {
return dfFlow;
} public void setDfFlow(long dfFlow) {
this.dfFlow = dfFlow;
} public long getFlowSum() {
return flowSum;
} public void setFlowSum(long flowSum) {
this.flowSum = flowSum;
}
}

4.分区类

package com.css.flow.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner; public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{ // 根据手机号前三位进行分区
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 获取手机号前三位
String phoneNum = key.toString().substring(0, 3);
// 分区
int partitioner = 4; if ("135".equals(phoneNum)) {
return 0;
}else if ("137".equals(phoneNum)) {
return 1;
}else if ("138".equals(phoneNum)) {
return 2;
}else if ("139".equals(phoneNum)) {
return 3;
}
return partitioner;
}
}

5.Driver类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf); // 2.获取jar包
job.setJarByClass(FlowCountDriver.class); // 3.获取自定义的mapper与reducer类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class); // 4.设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class); // 5.设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class); // 设置自定义的分区类
// 自定义分区个数要大于分区数
job.setPartitionerClass(PhoneNumPartitioner.class);
job.setNumReduceTasks(5); // 6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out")); // 7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}

6.输入的文件HTTP_20180313143750.dat

3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
3631279950322 13822544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 www.taobao.com 淘宝网 4 0 264 0 200
3631279910362 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
3631244000322 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
3631279930342 18212575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
3631279950342 13884138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
3631279930352 13510439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
3631279950332 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 316 296 200
3631279830392 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
3631279840312 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 660 690 200
3631279730382 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 369 338 200
3631279860392 15889002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 938 380 200
3631279920332 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
3631279860312 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 120 1320 200
3631279840302 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 198 910 200
3631279950332 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
3631279820302 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 735 11349 400
3631279860322 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 212 200
3631279900332 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 4243 200
3631279880322 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
3631279850362 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
3631279930352 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1136 94 200
3631279930353 13560436326 C4-17-FE-BA-DE-D9:CMCC 120.196.100.77 lol.qq.com/ 英雄联盟 18 15 1136 94 200

7.输出的文件

(1)part-r-00000
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856 (2)part-r-00001
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240 (3)part-r-00002
13822544101 264 0 264
13884138413 4116 1432 5548 (4)part-r-00003
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644 (5)part-r-00004
13480253104 120 1320 1440
13602846565 198 910 1108
13660577991 660 690 1350
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

MapReduce小文件优化与分区的更多相关文章

  1. MapReduce小文件处理之CombineFileInputFormat实现

    在MapReduce使用过程中.一般会遇到输入文件特别小(几百KB.几十MB).而Hadoop默认会为每一个文件向yarn申请一个container启动map,container的启动关闭是很耗时的. ...

  2. MaxCompute小文件问题优化方案

    小文件背景知识 小文件定义 分布式文件系统按块Block存放,文件大小比块大小小的文件(默认块大小为64M),叫做小文件. 如何判断存在小文件数量多的问题 查看文件数量 desc extended + ...

  3. &lbrack;大牛翻译系列&rsqb;Hadoop(17)MapReduce 文件处理:小文件

    5.1 小文件 大数据这个概念似乎意味着处理GB级乃至更大的文件.实际上大数据可以是大量的小文件.比如说,日志文件通常增长到MB级时就会存档.这一节中将介绍在HDFS中有效地处理小文件的技术. 技术2 ...

  4. 第3节 mapreduce高级:5、6、通过inputformat实现小文件合并成为sequenceFile格式

    1.1 需求 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案 1.2 分析 小文件的优化无非以下几种方式: 1.  在数据 ...

  5. Spark优化之小文件是否需要合并&quest;

    我们知道,大部分Spark计算都是在内存中完成的,所以Spark的瓶颈一般来自于集群(standalone, yarn, mesos, k8s)的资源紧张,CPU,网络带宽,内存.Spark的性能,想 ...

  6. Hadoop MapReduce编程 API入门系列之小文件合并(二十九)

    不多说,直接上代码. Hadoop 自身提供了几种机制来解决相关的问题,包括HAR,SequeueFile和CombineFileInputFormat. Hadoop 自身提供的几种小文件合并机制 ...

  7. hive优化之自己主动合并输出的小文件

    1.先在hive-site.xml中设置小文件的标准. <property> <name>hive.merge.smallfiles.avgsize</name> ...

  8. mapreduce 关于小文件导致任务缓慢的问题

    小文件导致任务执行缓慢的原因: 1.很容易想到的是map task 任务启动太多,而每个文件的实际输入量很小,所以导致了任务缓慢 这个可以通过 CombineTextInputFormat,解决,主要 ...

  9. &lbrack;转载&rsqb;mapreduce合并小文件成sequencefile

    mapreduce合并小文件成sequencefile http://blog.csdn.net/xiao_jun_0820/article/details/42747537

随机推荐

  1. Daily Scrum – 1&sol;12

    Meeting Minutes Merge Wordlist & Word Recite entry. (P0) – Done. Remove "Word Challenge&quo ...

  2. ArcPad 10 的安装和部署

    ArcPad它被安装在一个手持装置或业内外的移动终端ArcGIS产品,那ArcPad这是Esri软件产品,哦,不是硬件. 虽然优于ArcGIS Desktop功能复杂的乐趣,是对于野外作业.数据採集等 ...

  3. 用PHP与XML进行网站编程

    一.小序 HTML简单易学又通用,一般的PHP程序就是嵌入在HTML语言之中实现的.但是随着WEB越来越广泛的应用,HTML的弱点也越来越明显了.XML的出现,弥补了这些不足,它提供了一个能够处理互联 ...

  4. 《HelloGitHub》第 25 期

    <HelloGitHub>第 25 期 兴趣是最好的老师,HelloGitHub 就是帮你找到兴趣! 简介 分享 GitHub 上有趣.入门级的开源项目. 这是一个面向编程新手.热爱编程. ...

  5. python购物车作业

    # -*- coding:utf8 -*- # Author:Wang Yao goods = [{"name": "电脑", "price&quot ...

  6. java web项目中打开资源文件中文乱码

    1 java web项目中经常使用多模块管理.在某一个模块中添加了一些资源文件.但不是启动项目.有时候需要在程序中读取资源文件内容,打包后放到容器中就不能正常运行了.需要将所有资源文件放到启动项目的 ...

  7. 【第三十二章】 elk(3)- broker架构 &plus; 引入logback

    实际中最好用的日志框架是logback,我们现在会直接使用logback通过tcp协议向logstash-shipper输入日志数据.在上一节的基础上修改!!! 一.代码 1.pom.xml 1 &l ...

  8. CXSprite&period;h文件

    #ifndef __XSprite_H__ #define __XSprite_H__ #include "CocoHead.h" #define BTN_FRAME_AMOUNT ...

  9. 《Beginning Java 7》 - 2 - Cloning 克隆

    Cloning 分两类:影子克隆 shallow cloning 深度克隆 deep cloning * 调用 clone() 需要 implments Cloneable.此函数为 protecte ...

  10. AMQP &amp&semi;&amp&semi; MQTT comparision

    1. AMQP (Advanced Message Queuing Protocol) 2. MQTT (Message Queuing Telemetry Transport) Introducti ...