YARN集群的mapreduce测试(四)

时间:2023-03-09 09:57:00
YARN集群的mapreduce测试(四)

将手机用户使用流量的数据进行分组,排序;

测试准备:

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在本地"/home/hadoop/test/"目录创建phoneflow文件夹,将所有需要统计的数据放到该文件夹下;

YARN集群的mapreduce测试(四)

测试目标:

目标一:输出结果是:按手机号分组后,按照上传流量和下载流量的总和排序的结果;

目标二:输出结果是:按手机号分组后,先按照上传流量排序,遇到相同时再按照上传流量和下载流量的总和排序;

测试代码

目标一:

因为涉及到了排序,我们输出的结果是一个包装好的flow对象(它自身就包含了很多信息);

分组必须必须要让flow类实现Serializable接口;

排序就必须要让flow类在分组的基础上再实现WritableComparable接口,并且重写write、readFields方法和重写compareTo方法;

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable; import org.apache.hadoop.io.WritableComparable; public class Flow implements WritableComparable<Flow>,Serializable{ private String phoneNum;//手机号
private Long upFlow; //上传流量
private Long downFlow; //下载流量
public Flow() {}
public Flow(String phoneNum, Long upFlow, Long downFlow) {
super();
this.phoneNum = phoneNum;
this.upFlow = upFlow;
this.downFlow = downFlow;
}
public Long getTotalFlow() {
return upFlow+downFlow;
} //按照怎样的顺序写入到reduce中,在reduce中就按照怎样的顺序读
//write是一个序列化的过程
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNum);
out.writeLong(upFlow);
out.writeLong(downFlow);
}
//read是一个反序列化的过程
@Override
public void readFields(DataInput in) throws IOException {
this.phoneNum = in.readUTF();
this.upFlow = in.readLong();
this.downFlow = in.readLong();
}
//reduce任务排序的依据
@Override
public int compareTo(Flow flow) {
Long curTotalFlow = this.getTotalFlow();
Long paramTotalFlow = flow.getTotalFlow();
Long resFlow = curTotalFlow-paramTotalFlow;
return resFlow>0?-1:1;
} public String getPhoneNum() {
return phoneNum;
}
public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
//此方法只是单纯的为了方便一次性设置值,只set一次
public void setFlow(String phoneNum, Long upFlow, Long downFlow) {
this.phoneNum = phoneNum;
this.upFlow = upFlow;
this.downFlow = downFlow;
}
@Override
public String toString() {
return new StringBuilder(phoneNum).append("\t")
.append(upFlow).append("\t")
.append(downFlow).append("\t")
.append(getTotalFlow())
.toString();
} }

Flow

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.IOException;

 import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class PhoneFlowMapper01 extends Mapper<LongWritable, Text, Text, Flow> { private Text outKey;
private Flow outValue; @Override
protected void setup(Mapper<LongWritable, Text, Text, Flow>.Context context)
throws IOException, InterruptedException {
outKey = new Text();
outValue = new Flow();
} @Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Flow>.Context context)
throws IOException, InterruptedException { String line = value.toString();
String[] fields = line.split("\\s+"); //过滤无效不完整的数据
if(fields.length<3) return; String phoneNum = fields[0];
String upFlow = fields[1];
String downFlow = fields[2]; outKey.set(phoneNum);
outValue.setFlow(phoneNum, Long.parseLong(upFlow), Long.parseLong(downFlow));;
context.write(outKey, outValue); } @Override
protected void cleanup(Mapper<LongWritable, Text, Text, Flow>.Context context)
throws IOException, InterruptedException {
outKey = null;
outValue = null;
} }

PhoneFlowMapper01

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class PhoneFlowReducer01 extends Reducer<Text, Flow, NullWritable, Flow> { private NullWritable outKey;
private Flow outValue; @Override
protected void setup(Reducer<Text, Flow, NullWritable, Flow>.Context context) throws IOException, InterruptedException {
outKey = NullWritable.get();
outValue = new Flow();
} @Override
protected void reduce(Text key, Iterable<Flow> values, Reducer<Text, Flow, NullWritable, Flow>.Context context)
throws IOException, InterruptedException {
Iterator<Flow> its = values.iterator(); Long totalUpFlow = 0L;//此处是Long类型,不能设置成null;
Long totalDownFlow = 0L;
while (its.hasNext()) {
Flow flow = its.next();
totalUpFlow += flow.getUpFlow();//求和千万别忘记+号
totalDownFlow += flow.getDownFlow();
} outValue.setFlow(key.toString(), totalUpFlow, totalDownFlow);
context.write(outKey, outValue); } @Override
protected void cleanup(Reducer<Text, Flow, NullWritable, Flow>.Context context) throws IOException, InterruptedException {
outValue = null;
} }

PhoneFlowReducer01

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author hadoop
*
*/
public class PhoneFlowDriver01 { private static FileSystem fs;
private static Configuration conf;
static {
String uri = "hdfs://master01:9000/";
conf = new Configuration();
try {
fs = FileSystem.get(new URI(uri), conf, "hadoop");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job pfJob01 = getJob(args);
if (null == pfJob01) {
return;
}
//提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
boolean flag = false;
flag = pfJob01.waitForCompletion(true);
System.exit(flag?0:1);
} /**
* 获取Job实例
* @param args
* @return
* @throws IOException
*/
public static Job getJob(String[] args) throws IOException {
if (null==args || args.length<2) return null;
//放置需要处理的数据所在的HDFS路径
Path inputPath = new Path(args[0]);
//放置Job作业执行完成之后其处理结果的输出路径
Path outputPath = new Path(args[1]); //获取Job实例
Job pfJob01 = Job.getInstance(conf, "pfJob0102");
//设置运行此jar包入口类
//pfJob01的入口是WordCountDriver类
pfJob01.setJarByClass(PhoneFlowDriver01.class);
//设置Job调用的Mapper类
pfJob01.setMapperClass(PhoneFlowMapper01.class);
//设置Job调用的Reducer类(如果一个Job没有Reducer则可以不调用此条语句)
pfJob01.setReducerClass(PhoneFlowReducer01.class); //设置MapTask的输出键类型
pfJob01.setMapOutputKeyClass(Text.class);
//设置MapTask的输出值类型
pfJob01.setMapOutputValueClass(Flow.class); //设置整个Job的输出键类型(如果一个Job没有Reducer则可以不调用此条语句)
pfJob01.setOutputKeyClass(NullWritable.class);
//设置整个Job的输出值类型(如果一个Job没有Reducer则可以不调用此条语句)
pfJob01.setOutputValueClass(Flow.class); //设置整个Job需要处理数据的输入路径
FileInputFormat.setInputPaths(pfJob01, inputPath);
//设置整个Job计算结果的输出路径
FileOutputFormat.setOutputPath(pfJob01, outputPath); return pfJob01;
} }

PhoneFlowDriver01

 package com.mmzs.bigdata.yarn.mapreduce;

 // import java.io.IOException;
// import org.apache.hadoop.io.LongWritable;
// import org.apache.hadoop.io.NullWritable;
// import org.apache.hadoop.io.Text;
// import org.apache.hadoop.mapreduce.Mapper;
// import org.apache.hadoop.mapreduce.lib.input.FileSplit; // public class PhoneFlowMapper02 extends Mapper<LongWritable, Text, Flow, NullWritable> { // private Flow outKey;
// private NullWritable outValue; // @Override
// protected void setup(Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
// throws IOException, InterruptedException {
// outKey = new Flow();
// outValue = NullWritable.get();
// } // @Override
// protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
// throws IOException, InterruptedException { // String line = value.toString();
// String[] fields = line.split("\\s+"); // String phoneNum = fields[0];
// String upFlow = fields[1];
// String downFlow = fields[2]; 因为获取过来的都是String类型,所以需要转换参数类型
// outKey.setFlow(phoneNum, Long.parseLong(upFlow), Long.parseLong(downFlow));;
// context.write(outKey, outValue); // } // @Override
// protected void cleanup(Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
// throws IOException, InterruptedException {
// outKey = null;
// outValue = null;
// } // }

PhoneFlowMapper02

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class PhoneFlowReducer02 extends Reducer<Flow, NullWritable, Flow, NullWritable> { private NullWritable outValue; @Override
protected void setup(Reducer<Flow, NullWritable, Flow, NullWritable>.Context context) throws IOException, InterruptedException {
outValue = NullWritable.get();
} @Override
protected void reduce(Flow key, Iterable<NullWritable> values, Reducer<Flow, NullWritable, Flow, NullWritable>.Context context)
throws IOException, InterruptedException {
//此reduce不能少,它会自动调用compareTo方法进行排序
//排序的工作是在shuffle的工程中进行的
context.write(key, outValue);
} @Override
protected void cleanup(Reducer<Flow, NullWritable, Flow, NullWritable>.Context context) throws IOException, InterruptedException {
outValue = null;
} }

PhoneFlowReducer02

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author hadoop
*
*/
public class PhoneFlowDriver02 { private static FileSystem fs;
private static Configuration conf;
static {
String uri = "hdfs://master01:9000/";
conf = new Configuration();
try {
fs = FileSystem.get(new URI(uri), conf, "hadoop");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job pfJob02 = getJob(args);
if (null == pfJob02) {
return;
}
//提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
boolean flag = false;
flag = pfJob02.waitForCompletion(true);
System.exit(flag?0:1);
} /**
* 获取Job实例
* @param args
* @return
* @throws IOException
*/
public static Job getJob(String[] args) throws IOException {
if (null==args || args.length<2) return null;
//放置需要处理的数据所在的HDFS路径
Path inputPath = new Path(args[1]);
//放置Job作业执行完成之后其处理结果的输出路径
Path outputPath = new Path(args[2]); //获取Job实例
Job pfJob02 = Job.getInstance(conf, "pfJob02");
//设置运行此jar包入口类
//pfJob02的入口是WordCountDriver类
pfJob02.setJarByClass(PhoneFlowDriver02.class);
//设置Job调用的Mapper类
pfJob02.setMapperClass(PhoneFlowMapper02.class);
//设置Job调用的Reducer类(如果一个Job没有Reducer则可以不调用此条语句)
pfJob02.setReducerClass(PhoneFlowReducer02.class); //设置MapTask的输出键类型
pfJob02.setMapOutputKeyClass(Flow.class);
//设置MapTask的输出值类型
pfJob02.setMapOutputValueClass(NullWritable.class); //设置整个Job的输出键类型(如果一个Job没有Reducer则可以不调用此条语句)
pfJob02.setOutputKeyClass(Flow.class);
//设置整个Job的输出值类型(如果一个Job没有Reducer则可以不调用此条语句)
pfJob02.setOutputValueClass(NullWritable.class); //设置整个Job需要处理数据的输入路径
FileInputFormat.setInputPaths(pfJob02, inputPath);
//设置整个Job计算结果的输出路径
FileOutputFormat.setOutputPath(pfJob02, outputPath); return pfJob02;
} }

PhoneFlowDriver02

 package com.mmzs.bigdata.yarn.mapreduce;

 import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job; public class PhoneFlowDriver { private static FileSystem fs;
private static Configuration conf;
private static final String TEMP= "hdfs://master01:9000/data/phoneflow/tmp";
static {
String uri = "hdfs://master01:9000/";
conf = new Configuration();
try {
fs = FileSystem.get(new URI(uri), conf, "hadoop");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (null == args || args.length<2) {
System.out.println("参数至少是两个");
return;
} Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Path tmpPath = new Path(TEMP);
//主机文件路径
File localPath = new File("/home/hadoop/test/phoneflow");
//如果输入的集群路径存在,则删除
if (fs.exists(outputPath)) fs.delete(outputPath, true);
if (fs.exists(tmpPath)) fs.delete(tmpPath, true);
if (!fs.exists(inputPath)) {
//创建并且将数据文件拷贝到创建的集群路径
Boolean flag = fs.mkdirs(inputPath);
if (!flag) {
System.out.println("创建集群路径失败");
}
File[] files = localPath.listFiles();
Path[] localPaths = new Path[files.length];
for (int i = 0; i < files.length; i++) {
localPaths[i] = new Path(files[i].getAbsolutePath());
}
fs.copyFromLocalFile(false, false, localPaths, inputPath);
} String[] params = {args[0], TEMP, args[1]}; //运行第1个Job
Job pfJob01 = PhoneFlowDriver01.getJob(params);
if (null == pfJob01) return;
//提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
boolean flag01 = pfJob01.waitForCompletion(true);
if (!flag01) {
System.out.println("pfJob01 running failure......");
System.exit(1);
}
System.out.println("pfJob01 running success......"); //运行第2个Job
Job pfJob02 = PhoneFlowDriver02.getJob(params);
if (null == pfJob02) return;
//提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
boolean flag02 = pfJob02.waitForCompletion(true);
if (flag02) {//等待Job02完成后就删掉中间目录并退出;
fs.delete(new Path(TEMP), true);
System.out.println("pfJob02 running success......");
System.exit(0);
}
System.out.println("pfJob02 running failure......");
System.exit(1);
} }

PhoneFlowDriver(主类)

目标二:

想要达到目标二,只需将排序的方法做一定的修改即可;

所以我们需要修改Flow类中作排序依据比较的compareTo方法;

     @Override
public int compareTo(Flow flow) {
//目标一:按照总流量进行排序
// Long curTotalFlow=this.getTotalFlow();
// Long paramTotalFlow=flow.getTotalFlow();
// Long resFlow=curTotalFlow-paramTotalFlow; //目标二:先按照上行流量进行排序,如果相同再比较总流量
Long curUpFlow=this.getUpFlow();
Long paramUpFlow=flow.getUpFlow();
Long resFlow=curUpFlow-paramUpFlow;
//如果上行流量相同就比较总流量
if(resFlow==0){
Long curTotalFlow=this.getTotalFlow();
Long paramTotalFlow=flow.getTotalFlow();
resFlow=curTotalFlow-paramTotalFlow;
} return resFlow>0?-1:1;
}

compareTo

测试结果:

运行时传入参数是:

如果在客户端eclipse上运行:传参需要加上集群的master的uri即 hdfs://master01:9000

输入路径参数:  /data/phoneflow/src

输出路径参数:  /data/phoneflow/dst

YARN集群的mapreduce测试(四)

目标一:

YARN集群的mapreduce测试(四)

YARN集群的mapreduce测试(四)

YARN集群的mapreduce测试(四)

YARN集群的mapreduce测试(四)

目标二:

YARN集群的mapreduce测试(四)