pyspark 逻辑回归程序

时间:2023-12-14 18:58:26

http://www.qqcourse.com/forum.php?mod=viewthread&tid=3688

【很重要】:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html

官方文档里面关于模型配置的所有参数

[spark dataframe ,pandas数据结构使用]http://blog.csdn.net/chaoran_liu/article/details/52203831

【很重要,】pipeline 数据结构 数据框格式 dataframe模型完整程序】http://blog.csdn.net/u013719780/article/details/52277616

【3种模型效果比较:逻辑回归,决策树,随机森林】http://blog.csdn.net/chaoran_liu/article/details/52203831

使用 ML Pipeline 构建机器学习工作流:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice5/

[图片地址】: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html

pyspark 逻辑回归程序

#此程序在hadoop集群中运行

pyspark --master yarn-client --executor-memory 5G --num-executors 50   #在shell中输入,指定内存5个g,50个节点

from pyspark import SparkContext
from pyspark.mllib.classification import LogisticRegressionWithLBFGS,LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

def parsePoint(line):
     values=[float(x) for x in line.split('\t')]
      return LabeledPoint(values[0],values[1:])

data1=sc.textFile('1029_IOS_features_age_18t24')  #文件需要保存在默认的集群地址上, hdfs://getui-bi-hadoop/user/zhujx

parsedata=data1.map(parsePoint) #数据转化为LabeledPoint 格式

#build model

modelage18=LogisticRegressionWithLBFGS.train(parsedata,regType="l1") #l1正则

#print model.weights

labepreds=parsedata.map(lambda p: (p.label,modelage18.predict(p.features))) #同时展示了预测类别和原表类别,原数据是labelpoint形式的预测方法
#labepreds2=train1.map(lambda p:(p.label,model.predict(p.features))) #测试预测集

trainerro=labepreds.filter(lambda (v,p):v!=p).count() / float(parsedata.count())
prerat=labepreds.filter(lambda (v,p):v==p).count()/float(parsedata.count())

crosstable1=labepreds.filter(lambda (v,p):p==1).count()
crosstable0=labepreds.filter(lambda (v,p):p==0).count()

crosstable11=labepreds.filter(lambda (v,p):p==1 and v==1).count() #预测值是1,实际值也是1的样本个数
crosstable10=labepreds.filter(lambda (v,p):p==1 and v==0).count()
crosstable01=labepreds.filter(lambda (v,p):p==0 and v==1).count()
crosstable00=labepreds.filter(lambda (v,p):p==0 and v==0).count()

print ("train err =" + str(trainerro))
print("11:"+ str(crosstable11),"10:"+ str(crosstable10),"01:"+ str(crosstable01),"00:"+ str(crosstable00))

#保存模型
modelage18.save(sc,"target/tmp/LR_age18-24")
sameModel = LogisticRegressionModel.load(sc,"target/tmp/LR_age18-24")

***********************把样本分成训练集和测试集*******************

splits = parsedData.randomSplit((0.7, 0.3))
trainingData = splits[0]
testData = splits[1] #这部分用于测试准确率
model_train = LogisticRegressionWithLBFGS.train(trainingData,regType="l1",intercept=False)

print model_train.weights

labelsAndPreds = testData.map(lambda p: (p.label,model_train.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /float(testData.count())