mapreduce从数据库中分析数据,并把分析结果写入数据库中

时间:2022-06-20 10:29:16

创建类

package myTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class DBRecord implements Writable, DBWritable{
    private String title;
    private String author;
    private long numberRead;
    private String url;
    
    public DBRecord() {
        
    }
    
    public DBRecord(String title, String author, long numberRead, String url) {
        this.title = title;
        this.author = author;
        this.numberRead = numberRead;
        this.url = url;
    }
    public String getTitle() {
        return title;
    }
    
    public void setTitle(String title) {
        this.title = title;
    }
    
    public String getAuthor() {
        return author;
    }
    
    public void setAuthor(String author) {
        this.author = author;
    }
    
    public long getNumberRead() {
        return numberRead;
    }
    
    public void setNumberRead(long numberRead) {
        this.numberRead = numberRead;
    }
    
    public String getUrl() {
        return url;
    }
    
    public void setUrl(String url) {
        this.url = url;
    }
    

    @Override
    public void readFields(ResultSet set) throws SQLException {
        this.title = set.getString("title");
        this.author = set.getString("author");
        this.numberRead = set.getLong("numberRead");
        this.url = set.getString("url");
    }

    @Override
    public void write(PreparedStatement pst) throws SQLException {
        pst.setString(1, title);
        pst.setString(2, author);
        pst.setLong(3, numberRead);
        pst.setString(4, url);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.title = Text.readString(in);
        this.author = Text.readString(in);
        this.numberRead = in.readLong();
        this.url = Text.readString(in);
        
    }

    @Override
    public void write(DataOutput out) throws IOException {
        
        Text.writeString(out, this.title);
        Text.writeString(out, this.author);
        out.writeLong(this.numberRead);
        Text.writeString(out, this.url);
        
    }

    @Override
    public String toString() {
         return "title: " + this.title + " author: " + this.author + " numberRead: " + this.numberRead + " url: " + this.url;  
    }
}

创建类

package flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class FlowBean implements WritableComparable<FlowBean>, Writable, DBWritable {

    private String userName;
    private long numberRead;
    private long numberArticle;
    
    // 在反序列时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
    public  FlowBean() {}
    
    public FlowBean(String userName, long numberRead, long numberArticle) {
        this.userName = userName;
        this.numberRead = numberRead;
        this.numberArticle = numberArticle;
    }
    
    public String getUserName() {
        return userName;
    }
    
    public void setUserName(String userName) {
        this.userName = userName;
    }
    
    public long getNumberRead() {
        return numberRead;
    }
    
    public void setNumberRead(long numberRead) {
        this.numberRead = numberRead;
    }
    
    public long getNumberArticle() {
        return numberArticle;
    }
    
    public void setNumberArticle(long numberArticle) {
        this.numberArticle = numberArticle;
    }
    
    
    // 从数据流中反序列出对象的数据
    // 从数据流中独处对象字段事,必须和序列化时的顺序保持一致
    @Override
    public void readFields(DataInput arg0) throws IOException {
        userName = arg0.readUTF();
        numberRead = arg0.readLong();
        numberArticle = arg0.readLong();
    }
    
    // 将对象数据序列化到流中
    @Override
    public void write(DataOutput arg0) throws IOException {
        arg0.writeUTF(userName);
        arg0.writeLong(numberRead);
        arg0.writeLong(numberArticle);
    }
    @Override
    public int compareTo(FlowBean o) {
        return numberRead > o.getNumberRead() ? -1 : 1;
    }
    
    
    @Override
    public String toString() {
        return userName + "\t" + numberRead + "\t" + numberArticle;
    }

    @Override
    public void readFields(ResultSet arg0) throws SQLException {
        this.userName = arg0.getString(1);
        this.numberRead = arg0.getLong(2);
        this.numberArticle = arg0.getLong(3);
        
    }

    @Override
    public void write(PreparedStatement arg0) throws SQLException {
        arg0.setString(1, this.userName);
        arg0.setLong(2, this.numberRead);
        arg0.setLong(3, this.numberArticle);
    }
    
}

 

package student;

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import flowsum.FlowBean;
import myTest.DBRecord;

public class SunDB {

    public static class SunMapper extends Mapper<LongWritable, DBRecord, Text, FlowBean> { 
        @Override
        protected void map(LongWritable key, DBRecord value,
                Mapper<LongWritable, DBRecord, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            String userName = value.getAuthor();
            long numberRead = value.getNumberRead();
            FlowBean bean = new FlowBean(userName, numberRead, 1);
            context.write(new Text(userName), bean);
            System.out.println(bean);
        }
    }
    
    public static class SunReducer extends Reducer<Text, FlowBean, FlowBean, Text> {
        @Override
        protected void reduce(Text arg0, Iterable<FlowBean> arg1, Reducer<Text, FlowBean, FlowBean, Text>.Context arg2)
                throws IOException, InterruptedException {
            long numberReadCount = 0;
            long numberArticleCount = 0;
            for(FlowBean bean : arg1) {
                numberReadCount +=bean.getNumberRead();
                numberArticleCount +=bean.getNumberArticle();
            }
            FlowBean bean = new FlowBean(arg0.toString(), numberReadCount, numberArticleCount);
            arg2.write(bean, new Text());
            System.out.println(bean);
        }
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
         DBConfiguration.configureDB(configuration,"com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1:3306/sun_test??characterEncoding=utf8&useSSL=false","root","Sun@123");
         String [] fields = {"title", "author", "numberRead", "url"};
         final Job job = new Job(configuration, "yaya");
         job.setJarByClass(SunDB.class);
         job.setInputFormatClass(DBInputFormat.class);
         DBInputFormat.setInput(job, DBRecord.class, "sourcess", null, null, fields);
         job.setMapperClass(SunMapper.class);
         job.setReducerClass(SunReducer.class);
         
         job.setNumReduceTasks(1);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(FlowBean.class);
         DBOutputFormat.setOutput(job, "flow","userName","numberRead", "numberArticle");
         job.waitForCompletion(true);
    }
}