Spark1.6.2 java实现读取txt文件插入MySql数据库代码

时间:2022-12-17 15:03:29

package com.gosun.spark1;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class MySqlWriter {

public static void main(String[] args) throws ClassNotFoundException {
        
        long current = System.currentTimeMillis();
        //String master = "spark://192.168.31.34:7077";
        String localmaster = "local[5]";
        SparkConf sparkConf = new SparkConf().setAppName("mysqlJdbc").setMaster(localmaster);
        // spark应用的上下对象
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(sc);
        String url = "jdbc:mysql://192.168.31.16:3306/db?useUnicode=true&characterEncoding=utf-8";
        String table = "tb_user";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "root");
        connectionProperties.put("password", "mysql");
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
        // 加载数据库中的数据
        JavaRDD<String> lines = sc.textFile( "F:/BaiduYunDownload/data/class9/user1.txt" );  
        JavaRDD<Row> personRDD  = lines.map(new Function<String, Row>() {  
              
            private static final long serialVersionUID = 1L;  
 
            public Row call( String line )  
                throws Exception {               
                String[] split = line.split("\t");  
                return RowFactory.create(String.valueOf(split[0]),String.valueOf(split[1]));  
            }  
        });  
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField( "id", DataTypes.StringType, false ));
        structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));
        StructType structType = DataTypes.createStructType( structFields );  
        DataFrame usersDf = sqlContext.createDataFrame( personRDD, structType);  
        usersDf.write().mode(SaveMode.Append).mode(SaveMode.Overwrite).jdbc(url, table, connectionProperties);
        System.out.println((System.currentTimeMillis() - current) / 1000 + "s");
    }

}