spark-MLlib之线性回归

时间:2021-07-17 13:36:35

>>提君博客原创  http://www.cnblogs.com/tijun/  <<

假定线性拟合方程:

提君博客原创

spark-MLlib之线性回归

变量 X是 i 个变量或者说属性 

参数 ai 是模型训练的目的就是计算出这些参数的值。

线性回归分析的整个过程可以简单描述为如下三个步骤:

  1. 寻找合适的预测函数,即上文中的 h(x)h(x) ,用来预测输入数据的判断结果。这个过程时非常关键的,需要对数据有一定的了解或分析,知道或者猜测预测函数的“大概”形式,比如是线性函数还是非线性函数,若是非线性的则无法用线性回归来得出高质量的结果。
  2. 构造一个Loss函数(损失函数),该函数表示预测的输出(h)与训练数据标签之间的偏差,可以是二者之间的差(h-y)或者是其他的形式(如平方差开方)。综合考虑所有训练数据的“损失”,将Loss求和或者求平均,记为 J(θ)J(θ) 函数,表示所有训练数据预测值与实际类别的偏差。
  3. 显然, J(θ)J(θ) 函数的值越小表示预测函数越准确(即h函数越准确),所以这一步需要做的是找到 J(θ)J(θ) 函数的最小值。找函数的最小值有不同的方法,Spark中采用的是梯度下降法(stochastic gradient descent, SGD)。

线性回归同样可以采用正则化手段,其主要目的就是防止过拟合。

当采用L1正则化时,则变成了Lasso Regresion;当采用L2正则化时,则变成了Ridge Regression;线性回归未采用正则化手段。通常来说,在训练模型时是建议采用正则化手段的,特别是在训练数据的量特别少的时候,若不采用正则化手段,过拟合现象会非常严重。L2正则化相比L1而言会更容易收敛(迭代次数少),但L1可以解决训练数据量小于维度的问题(也就是n元一次方程只有不到n个表达式,这种情况下是多解或无穷解的)。

提君博客原创

在spark中分三种回归:LinearRegression、Lasso和RidgeRegression(岭回归)

采用L1正则化时为Lasso回归(元素绝对值),采用L2时为RidgeRegression回归(元素平方),没有正则化时就是线性回归。

比如岭回归的损失函数: 
spark-MLlib之线性回归

显然,损失函数值越小说明当前这条直线拟合效果越好>>提君博客原创  http://www.cnblogs.com/tijun/  <<
通常用梯度下降法 用来最小化损失值? 

spark中线性回归算法可使用的类包括LinearRegression、LassoWithSGD、RidgeRegressionWithSGD(SGD代表随机梯度下降法),

这几个类都有几个可以用来对算法调优的参数

  • numIterations 要迭代的次数
  • stepSize 梯度下降的步长(默认1.0)
  • intercept 是否给数据加上一个干扰特征或者偏差特征(默认:false)
  • regParam Lasso和ridge的正规参数(默认1.0)

下面是实例>>提君博客原创  http://www.cnblogs.com/tijun/  <<

训练集下载

训练集概况

-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
...

数据格式:逗号之前为label;之后为8个特征值,以空格分隔。

代码

package com.ltt.spark.ml.example;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.GeneralizedLinearModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LassoModel;
import org.apache.spark.mllib.regression.LassoWithSGD;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
import org.apache.spark.mllib.regression.RidgeRegressionModel;
import org.apache.spark.mllib.regression.RidgeRegressionWithSGD; import java.util.Arrays; import org.apache.spark.SparkConf;
import scala.Tuple2; /**
*
* Title: LinearRegresionExample.java
* Description: 本地代码执行,机器学习之线性回归
* <br/>
* @author liutiti
* @created 2017年11月21日 下午4:03:45
*/
@SuppressWarnings("resource")
public class LinearRegresionExample { /**
*
* @discription 程序测试入口
* @author liutiti
* @created 2017年11月21日 上午4:03:45
* @param args
*/
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("LinearRegresion").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//原始的数据-0.4307829,-1.63735562648104 -2.00621178480549 ...
JavaRDD<String> data = sc.textFile("E:\\spark-ml-data\\lpsa.txt"); //转换数据格式:把每一行原始的数据(num1,num2 num3 ...)转换成LabeledPoint(label, features)
JavaRDD<LabeledPoint> parsedData = data.filter(line -> { //过滤掉不符合的数据行
if(line.length() > 3)
return true;
return false;
}).map(line -> { //读取转换成LabeledPoint
String[] parts = line.split(","); //逗号分隔
double[] ds = Arrays.stream(parts[1].split(" ")) //空格分隔
.mapToDouble(Double::parseDouble)
.toArray();
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(ds));
});
//rdd持久化内存中,后边反复使用,不必再从磁盘加载
parsedData.cache(); //设置迭代次数
int numIterations = 100;
//三种模型进行训练
LinearRegressionModel linearModel = LinearRegressionWithSGD.train(parsedData.rdd(), numIterations);
RidgeRegressionModel ridgeModel = RidgeRegressionWithSGD.train(parsedData.rdd(), numIterations);
LassoModel lassoModel = LassoWithSGD.train(parsedData.rdd(), numIterations);
//打印信息
print(parsedData, linearModel);
print(parsedData, ridgeModel);
print(parsedData, lassoModel); //预测一条新数据方法,8个特征值
double[] d = new double[]{1.0, 1.0, 2.0, 1.0, 3.0, -1.0, 1.0, -2.0};
Vector v = Vectors.dense(d);
System.out.println("Prediction of linear: "+linearModel.predict(v));
System.out.println("Prediction of ridge: "+ridgeModel.predict(v));
System.out.println("Prediction of lasso: "+lassoModel.predict(v)); // //保存模型
// model.save(sc.sc(),"myModelPath" );
// //加载模型
// LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "myModelPath"); //关闭
sc.close();
} /**
*
* @discription 统一输出方法
* @author liutiti
* @created 2017年11月22日 上午10:00:27
* @param parsedData
* @param model
*/
public static void print(JavaRDD<LabeledPoint> parsedData, GeneralizedLinearModel model) {
JavaPairRDD<Double, Double> valuesAndPreds = parsedData.mapToPair(point -> {
double prediction = model.predict(point.features()); //用模型预测训练数据
return new Tuple2<>(point.label(), prediction);
});
//打印训练集中的真实值与相对应的预测值
valuesAndPreds.foreach((Tuple2<Double, Double> t) -> {
System.out.println("训练集真实值:"+t._1()+" ,预测值: "+t._2());
});
//计算预测值与实际值差值的平方值的均值
Double MSE = valuesAndPreds.mapToDouble((Tuple2<Double, Double> t) -> Math.pow(t._1() - t._2(), 2)).mean();
System.out.println(model.getClass().getName() + " training Mean Squared Error = " + MSE);
}
}

提君博客原创

>>提君博客原创  http://www.cnblogs.com/tijun/  <<

spark官方java api 文档