MapReduce案例11——影评分析4(分析某个用户的评分与大众评分的差别)

时间:2023-02-02 18:23:36

题目:

现有如此三份数据:
1、users.dat    数据格式为:  2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat		数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat		数据格式为:  1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)

题目分析:本案例可以分解为4步进行,

第一步:求出喜欢看电影的女性(uid为key值,影评次数为value值)

第二步:选出最喜欢看电影的人,即对第一步的结果进行降序排列,取第一个值

第三步:选出第二步取出人的最高评分的10部电影(movieid,评分为key值,通过降序排列,取出前10)

第四步:求出选出10部电影的平均评分,然后进行组合输出,输出结果为电影id,电影名称,平均评分,用户id

通过4个MapReduce进行任务完成,通过jobcontrol串行进行,由于需要依赖上步结果,需要使用setup加载文件到内存中去,所以需要在集群完成调试

第二步需要借助实现类和分组类进行(影评次数降序排序)排序和(男女分组)分组

排序代码:

/**
 * @author: lpj   
 * @date: 2018年3月18日 下午1:13:02
 * @Description:
 */
package lpj.filmBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 *
 */
public class FMaxRateBean implements WritableComparable<FMaxRateBean>{
	private String sex;
	public String getSex() {
		return sex;
	}
	public void setSex(String sex) {
		this.sex = sex;
	}
	private String uid;
	private int num;
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	public int getNum() {
		return num;
	}
	public void setNum(int num) {
		this.num = num;
	}
	
	/**
	 * @param sex
	 * @param uid
	 * @param num
	 */
	public FMaxRateBean(String sex, String uid, int num) {
		super();
		this.sex = sex;
		this.uid = uid;
		this.num = num;
	}
	/**
	 * 
	 */
	public FMaxRateBean() {
		super();
		// TODO Auto-generated constructor stub
	}
	
	@Override
	public String toString() {
		return sex + "\t" + uid + "\t" + num;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(sex);
		out.writeUTF(uid);
		out.writeInt(num);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		sex = in.readUTF();
		uid = in.readUTF();
		num = in.readInt();
	}
	@Override
	public int compareTo(FMaxRateBean o) {
		int diff = this.sex.compareTo(o.sex);
		if (diff == 0) {
			
			return o.num - this.num;
		}else {
			return diff;
		}
	}
	

}

分组代码:

/**
 * @author: lpj   
 * @date: 2018年3月18日 下午1:59:06
 * @Description:
 */
package lpj.filmBean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 *
 */
public class FMaxgroup extends WritableComparator{
	/**
	 * 
	 */
	public FMaxgroup() {
		super(FMaxRateBean.class,true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		FMaxRateBean f1 = (FMaxRateBean)a;
		FMaxRateBean f2 = (FMaxRateBean)b;
		return f1.getSex().compareTo(f2.getSex());
	}
	
	

}

第三步需要实现用户影评分数降序排序和用户分组

排序代码:

/**
 * @author: lpj   
 * @date: 2018年3月18日 下午2:16:58
 * @Description:
 */
package lpj.filmBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 *
 */
public class FUserMoviebean implements WritableComparable<FUserMoviebean>{
	private String uid;
	private String moiveid;
	private int rate;
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	public String getMoiveid() {
		return moiveid;
	}
	public void setMoiveid(String moiveid) {
		this.moiveid = moiveid;
	}
	public int getRate() {
		return rate;
	}
	public void setRate(int rate) {
		this.rate = rate;
	}
	/**
	 * @param uid
	 * @param moiveid
	 * @param rate
	 */
	public FUserMoviebean(String uid, String moiveid, int rate) {
		super();
		this.uid = uid;
		this.moiveid = moiveid;
		this.rate = rate;
	}
	/**
	 * 
	 */
	public FUserMoviebean() {
		super();
		// TODO Auto-generated constructor stub
	}
	@Override
	public String toString() {
		return uid + "\t" + moiveid + "\t" + rate;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(uid);
		out.writeUTF(moiveid);
		out.writeInt(rate);
		
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		uid = in.readUTF();
		moiveid = in.readUTF();
		rate = in.readInt();
		
	}
	@Override
	public int compareTo(FUserMoviebean o) {
		int diff = this.uid.compareTo(o.uid);
		if (diff == 0) {
			return o.rate - this.rate;
		}else {
			return diff;
		}
	}
	

}

分组代码:

/**
 * @author: lpj   
 * @date: 2018年3月18日 下午2:21:30
 * @Description:
 */
package lpj.filmBean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 *
 */
public class FUserMoiveGroup extends WritableComparator{
	/**
	 * 
	 */
	public FUserMoiveGroup() {
		super(FUserMoviebean.class,true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		FUserMoviebean f1 = (FUserMoviebean)a;
		FUserMoviebean f2 = (FUserMoviebean)b;
		return f1.getUid().compareTo(f2.getUid());
	}
	

}

主体代码:

/**
 * @author: lpj   
 * @date: 2018年3月16日 下午7:16:47
 * @Description:
 */
package lpj.filmCritic;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import lpj.filmBean.FMaxRateBean;
import lpj.filmBean.FMaxgroup;
import lpj.filmBean.FUserMoiveGroup;
import lpj.filmBean.FUserMoviebean;
/**
 *
 */
public class UserMoiveAvgrateMR {
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		//------------------------------------------------
		FileSystem fs = FileSystem.get(conf);//默认使用本地
		Job job = Job.getInstance(conf);
		job.setJarByClass(UserMoiveAvgrateMR.class);
		job.setMapperClass(UserMoiveAvgrateMR_Mapper.class);
		job.setReducerClass(UserMoiveAvgrateMR_Reducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		Path inputPath = new Path("/a/totalFilmInfos.txt");
		Path outputPath = new Path("/a/homework11_4_1");
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		//------------------------------------------------
		FileSystem fs2 = FileSystem.get(conf);//默认使用本地
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(UserMoiveAvgrateMR.class);
		job2.setMapperClass(UserMoiveAvgrateMR2_Mapper.class);
		job2.setReducerClass(UserMoiveAvgrateMR2_Reducer.class);
		job2.setGroupingComparatorClass(FMaxgroup.class);
		job2.setOutputKeyClass(FMaxRateBean.class);
		job2.setOutputValueClass(NullWritable.class);
		Path inputPath2 = new Path("/a/homework11_4_1");
		Path outputPath2 = new Path("/a/homework11_4_2");
		if (fs2.exists(outputPath2)) {
			fs2.delete(outputPath2, true);
		}
		FileInputFormat.setInputPaths(job2, inputPath2);
		FileOutputFormat.setOutputPath(job2, outputPath2);
		//------------------------------------------------
		FileSystem fs3 = FileSystem.get(conf);//默认使用本地
		Job job3 = Job.getInstance(conf);
		job3.setJarByClass(UserMoiveAvgrateMR.class);
		job3.setMapperClass(UserMoiveAvgrateMR3_Mapper.class);
		job3.setReducerClass(UserMoiveAvgrateMR3_Reducer.class);
		job3.setGroupingComparatorClass(FUserMoiveGroup.class);
		job3.setOutputKeyClass(FUserMoviebean.class);
		job3.setOutputValueClass(NullWritable.class);
		URI uri = new URI("/a/homework11_4_2/part-r-00000");
		job3.addCacheFile(uri);
		Path inputPath3 = new Path("/a/totalFilmInfos.txt");
		Path outputPath3 = new Path("/a/homework11_4_3");
		if (fs3.exists(outputPath3)) {
			fs3.delete(outputPath3, true);
		}
		FileInputFormat.setInputPaths(job3, inputPath3);
		FileOutputFormat.setOutputPath(job3, outputPath3);
		//------------------------------------------------
		FileSystem fs4 = FileSystem.get(conf);//默认使用本地
		Job job4 = Job.getInstance(conf);
		job4.setJarByClass(UserMoiveAvgrateMR.class);
		job4.setMapperClass(UserMoiveAvgrateMR4_Mapper.class);
		job4.setReducerClass(UserMoiveAvgrateMR4_Reducer.class);
		job4.setOutputKeyClass(Text.class);
		job4.setOutputValueClass(Text.class);
		URI uri4 = new URI("/a/homework11_4_3/part-r-00000");
		job4.addCacheFile(uri4);
		Path inputPath4 = new Path("/a/totalFilmInfos.txt");
		Path outputPath4 = new Path("/a/homework11_4_4");
		if (fs4.exists(outputPath4)) {
			fs4.delete(outputPath4, true);
		}
		FileInputFormat.setInputPaths(job4, inputPath4);
		FileOutputFormat.setOutputPath(job4, outputPath4);
		//------------------------------------------------
		ControlledJob aJob = new ControlledJob(job.getConfiguration());
		ControlledJob bJob = new ControlledJob(job2.getConfiguration());
		ControlledJob cJob = new ControlledJob(job3.getConfiguration());
		ControlledJob dJob = new ControlledJob(job4.getConfiguration());
		aJob.setJob(job);
		bJob.setJob(job2);
		cJob.setJob(job3);
		dJob.setJob(job4);
		JobControl jc = new JobControl("jc");
		jc.addJob(aJob);
		jc.addJob(bJob);
		jc.addJob(cJob);
		jc.addJob(dJob);
		bJob.addDependingJob(aJob);
		cJob.addDependingJob(bJob);
		dJob.addDependingJob(cJob);
		Thread thread = new Thread(jc);
		thread.start();
		while(!jc.allFinished()){
			thread.sleep(1000);
		}
		jc.stop();
	}
	//(4)求最喜欢看电影(影评次数最多)的那位女性uid  评最高分的10部电影 movieid 的平均影评分 avgrate(人,电影名,影评)
	//-------------求最喜欢看电影的那位女性uid----------------
	//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
	//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
	public static class UserMoiveAvgrateMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("::");
			String sex = reads[4];
			String userid = reads[0];//key
			if (sex.endsWith("F")) {
				String kk = userid;
				kout.set("F" + "\t" + kk);
				context.write(kout, NullWritable.get());
			}
		}
	}
	public static class UserMoiveAvgrateMR_Reducer extends Reducer<Text, NullWritable, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			int rateNum = 0;
			for(NullWritable inv : values){
				rateNum++;
			}
			valueout.set(rateNum+"");
			context.write(key, valueout);
		}
		
	}
	//----------(影评次数最多)------------
	public static class UserMoiveAvgrateMR2_Mapper extends Mapper<LongWritable, Text, FMaxRateBean, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		FMaxRateBean fm = new FMaxRateBean();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("\t");
			fm.setSex(reads[0]);
			fm.setUid(reads[1]);
			fm.setNum(Integer.parseInt(reads[2]));
			context.write(fm, NullWritable.get());
		
		}
	}
	public static class UserMoiveAvgrateMR2_Reducer extends Reducer<FMaxRateBean, NullWritable, FMaxRateBean, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(FMaxRateBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(NullWritable inv : values){
				count++;
				if (count <= 1) {
					context.write(key, NullWritable.get());
				}else {
					return;
				}
			}
		}
		
	}
	//----------------评最高分的10部电影 movieid
	//uid  moiveid  rate 
	public static class UserMoiveAvgrateMR3_Mapper extends Mapper<LongWritable, Text, FUserMoviebean, NullWritable>{
		private static String userInfo = "";
		@SuppressWarnings("deprecation")
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			Path[] paths = context.getLocalCacheFiles();
			String str = paths[0].toUri().toString();
			String readline = null;
			BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
			while((readline = bf.readLine()) != null){
				userInfo = readline;//F	1150	1302
			}
			System.out.println(userInfo);
			IOUtils.closeStream(bf);
		}
		Text kout = new Text();
		Text valueout = new Text();
		
		FUserMoviebean fu = new FUserMoviebean();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			System.out.println(userInfo);
			String [] userinfos = userInfo.split("\t");
			String userid = userinfos[1];
			//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
			//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
			String [] reads = value.toString().trim().split("::");//注意分隔符!!!!!
			if (reads[0].equals(userid)) {
				fu.setUid(reads[0]);
				fu.setMoiveid(reads[1]);
				fu.setRate(Integer.parseInt(reads[2]));
				context.write(fu, NullWritable.get());
			}
		}
	}
	public static class UserMoiveAvgrateMR3_Reducer extends Reducer<FUserMoviebean, NullWritable, FUserMoviebean, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(FUserMoviebean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(NullWritable inv : values){
				count++;
				if (count <= 10) {//取评分最高的10个
					context.write(key, NullWritable.get());
				}else {
					return;
				}
			}
		}
		
	}
	//--------------------的平均影评分 avgrate(人,电影名,影评)
	//1150	1230	5
	//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
	//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
	public static class UserMoiveAvgrateMR4_Mapper extends Mapper<LongWritable, Text, Text, Text>{
		private static List<String> movidmap = new ArrayList<>();
		@SuppressWarnings("deprecation")
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			Path[] paths = context.getLocalCacheFiles();
			String str = paths[0].toUri().toString();
			String readline = null;
			BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
			while((readline = bf.readLine()) != null){
				movidmap.add(readline);//1150	1230	5
			}
			IOUtils.closeStream(bf);
		}
		Text kout = new Text();
		Text valueout = new Text();
		
		FUserMoviebean fu = new FUserMoviebean();
		//用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().split("::");
			String movieid = reads[1];
			for(int i = 0; i < movidmap.size(); i++){
				String movieidKnow = movidmap.get(i).split("\t")[1];
				String uid =  movidmap.get(i).split("\t")[0];
				if (movieid.equals(movieidKnow)) {
					String kk = movieid;
					String vv = reads[8] + "\t" + reads[2] + "\t" + uid;
					kout.set(kk);
					valueout.set(vv);
					context.write(kout, valueout);
				}
			}
			
		}
	}
	public static class UserMoiveAvgrateMR4_Reducer extends Reducer<Text, Text, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			int totalRate = 0;
			double avgRate = 0;
			String moivname = "";
			String uid = "";
			for(Text inv : values){
				count++;
				String[] split = inv.toString().split("\t");
				totalRate += Integer.parseInt(split[1]);
				moivname = split[0];
				uid = split[2];
			}
			avgRate = 1.0 * totalRate / count;
			DecimalFormat df = new DecimalFormat("#.#");
			String avg = df.format(avgRate);
			String vv = moivname + "\t" + avg + "\t" + uid;
			valueout.set(vv);
			context.write(key, valueout);
		}
		
	}
	
}
 

中间第一步结果:截取部分

F	1	53
F	10	401
F	1000	84
F	101	106
F	1012	55
F	1014	39
F	1024	43
F	1026	44
F	1028	61
F	1034	337
F	1038	148
F	1039	28
F	1043	36
F	1045	37

中间第二步结果:

F	1150	1302

中间第三步结果:

1150	951	5
1150	3671	5
1150	3307	5
1150	1230	5
1150	904	5
1150	162	5
1150	3675	5
1150	1966	5
1150	3163	5
1150	2330	5

最终结果

1230	Annie Hall (1977)	4.1	1150
162	Crumb (1994)	4.1	1150
1966	Metropolitan (1990)	3.6	1150
2330	Hands on a Hard Body (1996)	4.2	1150
3163	Topsy-Turvy (1999)	3.7	1150
3307	City Lights (1931)	4.4	1150
3671	Blazing Saddles (1974)	4	1150
3675	White Christmas (1954)	3.8	1150
904	Rear Window (1954)	4.5	1150
951	His Girl Friday (1940)	4.2	1150

总结:由于任务需要分成多步进行,中间相互关联较多,编写过程可以进行拆分调试。最终统一执行

过程出现问题:不同文件的拆分符号不同,加载文件需要在任务中添加