spark源码分析之二分类逻辑回归evaluation

时间:2022-06-01 18:30:13

在逻辑回归分类中,我们评价分类器好坏的主要指标有精准率(precision),召回率(recall),F-measure,AUC等,其中最常用的是AUC,它可以综合评价分类器性能,其他的指标主要偏重一些方面。我们介绍下spark中实现的这些评价指标,便于使用spark训练模型后,对训练结果进行评估。

1. 评价指标

1.1. 混淆矩阵

混淆矩阵(confusion matrix)用一张简单的表格,反应分类器对样本分类的情况

实际\预测 1 0
1 TP(True Positive) FN(Flase Negtive)
0 FP(False Positive) TN(True Negtive)

0/1代表两类样本,下面解释下表格中的含义

  • TP:真阳性,预测是1,实际也是1
  • FP:假阳性,预测是1,实际是0
  • TN:真阴性,预测是0,实际也是0
  • FN:假阴性,预测是0,实际是1

不难看出,这个矩阵一条对角线上带T的是预测正确的样本(数量),另外一条对角线上带F的是预测错误的样本。

1.2. 基础指标

由这个矩阵,我们可以计算一系列衡量分类器性能的指标

  • 准确率(Accuracy Rate)

(TP+TN)/(TP+FP+TN+FN)
分类器分对的样本在总样本中的比例

  • 精准度(Precision)

TP/(TP+FP)(1)
真正的正样本在分类器分出的正样本中的比例

  • 召回率(Recall)

TP/(TP+FN)(2)
样本中正例被正确分类的比例

  • TPR(True Positive Rate),同召回率
  • FPR(False Positive Rate)

FP/(FP+TN)(3)
被错误分成正例的样本在实际负例样本中的比例

1.3. F-measure

也称F-score,综合考虑precision和recall,经常用在信息检索中

Fβ=(β2+1)PRβ2P+R(4)
β=1 时,就是F1-score

1.4. ROC

样本经过分类器后,我们可以得到样本的预测值,以这些预测值为阈值,就可以得到这些预测值对应的的混淆矩阵,每个混淆矩阵都可以计算(FPR, TPR)这样的点对,将这些点对绘制在二维坐标系中,然后连起来就得到了ROC曲线
spark源码分析之二分类逻辑回归evaluation
显然坐标(1, 0)是所有正例全部分错,是最坏的情况,坐标(0, 1)是正例全部分对,是最好的情况,而 y=x 这条线代表了随机猜测的情况,因此正常的分类器的ROC曲线应该是高于这条直线的。

1.5. AUC

ROC是条曲线,不方便我们对比分类器的好坏,因此我们用ROC覆盖的面积这样一个数值来衡量分类器,AUC的计算方法主要有两种,一种用相邻两点构成的等腰梯形近似计算,另外一种利用与Wilcoxon-Mann-Witney Test等价关系计算。

1.5.1. 直角梯形法

如1.3中的图所示,ROC曲线上的两个相邻点 (x1,x2),(y1,y2) ,以及它们在x轴上的投影构成了一个直角梯形,当两个点足够接近时,可以近似为两点之间曲线下的面积
s=(y1x1)(x2+y2)/2(5)
将ROC曲线上的点依次组成这种对,连续计算相邻两点形成的直角梯形并累加即可得到近似的AUC值。

1.5.2. Wilcoxon-Mann-Witney Test

AUC和Wilcoxon-Mann-Witney Test是等价的,而Wilcoxon-Mann-Witney Test就是从样本中任意抽取一个正例本和一个负例,正例大于负例score的概率。具体计算这个概率可以通过统计所有的 正负样本对(M*N,M为正样本数量,N为负样本数量)中,正样本score大于负样本score的数量除以M*N来近似。如果这个pair的正负样本 score相等,则按0.5计算,这个方法的复杂度为 O((M+N)2) 。在此基础上,还有种改进方法,具体做法是将所有样本按score从大到小逆序排序,然后取所有正样本的排序次序 r 相加,
auc=positiveriM(M+1)/2MN(6)
这种方法下,如果某正例s的次序是 rk ,则算上这个样本,比它score小的样本数量就是 rk ,s与这些样本组成的pair对中,再去掉小于等于它的正样本就是需要计算的负样本的个数,而这些需要去掉的正样本数量则是 M (对应最大score的正例), M1 (对应score第二大的正例),依次类推,score最小的样本则对应1,也就是对应数列 M,M1,...,1 ,其和是 M(M+1)/2 ,分母上再除去 MN 即可。

2. 实现

2.1. BinaryLabelCounter

记录样本label的分布情况

private[evaluation] class BinaryLabelCounter(
var numPositives: Long = 0L,
var numNegatives: Long = 0L)

包含了正/负样本的数量
值得注意的是其运算中兼容了负例label为0/-1这两种情况,只要label小于等于0.5就认为是负例

def +=(label: Double): BinaryLabelCounter = { 
if (label > 0.5) numPositives += 1L else numNegatives += 1L
this
}

2.2. confusion matrix

count是大于当前score的样本的label分布,totalCount是所有的label的分布

private[evaluation] case class BinaryConfusionMatrixImpl(
count: BinaryLabelCounter,
totalCount: BinaryLabelCounter)
extends BinaryConfusionMatrix {


/** TP */
override def numTruePositives: Long = count.numPositives
/** FP */
override def numFalsePositives: Long = count.numNegatives
/** FN */
override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives
/** TN */
override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives
/** number of positives */
override def numPositives: Long = totalCount.numPositives
/** number of negatives */
override def numNegatives: Long = totalCount.numNegatives
}

2.3. 基础指标

包括precision,FPR,TPR(Recall),F-score,这些指标都定义成object,继承自BinaryClassificationMetricComputer基类,然后实现apply函数,可以不显式使用new,而类似函数形式来计算,好处是用在高阶函数的参数列表中,可以根据需要传入需要计算的指标,非常灵活,参见BinaryClassificationMetrics中createCurve函数的用法,计算逻辑都比较直观简单。

2.3.1. precision

private[evaluation] object Precision extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
val totalPositives = c.numTruePositives + c.numFalsePositives
if (totalPositives == 0) {
1.0
} else {
//式(1)
c.numTruePositives.toDouble / totalPositives
}
}
}

2.3.2. FPR

private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
if (c.numNegatives == 0) {
0.0
} else {
//式(3)
c.numFalsePositives.toDouble / c.numNegatives
}
}
}

2.3.3. TPR(Recall)

private[evaluation] object Recall extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
if (c.numPositives == 0) {
0.0
} else {
//式(2)
c.numTruePositives.toDouble / c.numPositives
}
}
}

2.3.4. F-measure

private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer {
private val beta2 = beta * beta
override def apply(c: BinaryConfusionMatrix): Double = {
val precision = Precision(c)
val recall = Recall(c)
if (precision + recall == 0) {
0.0
} else {
//式(4)
(1.0 + beta2) * (precision * recall) / (beta2 * precision + recall)
}
}
}

3. BinaryClassificationMetrics

计算样本的分布,构造ROC曲线,计算AUC等二分类评估指标

class BinaryClassificationMetrics @Since("1.3.0") (
@Since("1.3.0") val scoreAndLabels: RDD[(Double, Double)],
@Since("1.3.0") val numBins: Int)

类成员为含有预测值(score, label) pair对的样本rdd,numBins是用于计算ROC时的用的点数,当样本数远大于numBins时则抽样,相当于对样本score做等频离散化。

3.1. label分布与混淆矩阵

计算样本各score(预测值)的累积label分布cumulativeCounts与混淆矩阵confusions

private lazy val ( 
cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
// and then sort by score values in descending order.
//将具有相同预测值的样本累计在一起并按降序排序,key是预测值,value是BinaryLabelCounter,累计正样本和负样本的个数
val counts = scoreAndLabels.combineByKey(
createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortByKey(ascending = false)
//抽样并排序
val binnedCounts =
// Only down-sample if bins is > 0
if (numBins == 0) {
// Use original directly
counts
} else {
val countsSize = counts.count()
// Group the iterator into chunks of about countsSize / numBins points,
// so that the resulting number of bins is about numBins
var grouping = countsSize / numBins
if (grouping < 2) {
// numBins was more than half of the size; no real point in down-sampling to bins
logInfo(s"Curve is too small ($countsSize) for $numBins bins to be useful")
counts
} else {
//样本个数大于2倍numBins,抽样
if (grouping >= Int.MaxValue) {
logWarning(
s"Curve too large ($countsSize) for $numBins bins; capping at ${Int.MaxValue}")
grouping = Int.MaxValue
}
//grouped是将迭代器每grouping个组成一个新的迭代器,例如[i1, i2, i3,...,i100],如果grouping为4,则[[i1,i2,i3,i4], [i5,i6,i7,i8], ...]
counts.mapPartitions(_.grouped(grouping.toInt).map { pairs =>
//取新组中的第一个分数为新的pair分数,相当于等频离散化
val firstScore = pairs.head._1
//累加组内的label计数
val agg = new BinaryLabelCounter()
pairs.foreach(pair => agg += pair._2)
//拼成新的pair,相当于抽样了
(firstScore, agg)
})
}
}
//按partition内累积
val agg = binnedCounts.values.mapPartitions { iter =>
val agg = new BinaryLabelCounter()
iter.foreach(agg += _)
Iterator(agg)
}.collect()
//part间累积
val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount")
//part内累积:每个score先整体累加前一个part,在累加part内其他score
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
iter.map { case (score, c) =>
cumCount += c
(score, cumCount.clone())
}
}, preservesPartitioning = true)

cumulativeCounts.persist()
val confusions = cumulativeCounts.map { case (score, cumCount) =>

(score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
}
(cumulativeCounts, confusions)
}

我们做ROC曲线,是一系列score以及在这个score下的混淆矩阵,其实需要的是以那个score为threshold的label分布。举例来说,如果有20个样本,其score集合[0.9, 0.8. 0.7, 0.6, 0.5],对应样本score:label的情况(key是label 0/1,value是数量,第一项代表score等于0.9的样本中类0有1个,类1有2个)[[0:1, 1:2], [0:0, 1:3], [0:2, 1:3], [0:4, 1:2], [0:3, 1:0]],总的分布是[0:10, 1:10],因此我们可以按序累积[[0:1, 1:2], [0:1, 1:5], [0:3, 1:8], [0:7, 1:10], [0:10, 1:10]],每一个都是累加前面的,这样我们在最小的值就可以得到所有的分布,当以0.8为threshold时,大于0.8的判定为1,其中label的分布就是列表中的分布[0:1, 1:5],判定为0的分布用总的分布减掉就是[0:9, 1:5],然后在计算混淆矩阵就非常容易了。
为了达到上面的目的,代码首先计算了每个score下的label分布情况,然后逆序按从大到小排序(当然按顺序排序也是可以,得到的就是小于这个score的分布了),考虑到数据是分布式存储在不同的机器上的,但因为整体有序(part间有序,part内有序),所有part1中的所有score肯定是大于part0中的,因此可以先按part累积,part内的元素再逐个累积,最后就可以得到每个score下的label分布,比较巧妙。

3.2. createCurve函数

函数的入参是BinaryClassificationMetricComputer,可以根据需要计算的指标,返回pair指标,参考2.3节

/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (s, c) =>
(s, y(c))
}
}

/** Creates a curve of (metricX, metricY). */
private def createCurve(
x: BinaryClassificationMetricComputer,
y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (_, c) =>
(x(c), y(c))
}
}

3.3. ROC

产生ROC曲线

def roc(): RDD[(Double, Double)] = { 
//(FPR, TPR)
val rocCurve = createCurve(FalsePositiveRate, Recall)
val sc = confusions.context
val first = sc.makeRDD(Seq((0.0, 0.0)), 1)
val last = sc.makeRDD(Seq((1.0, 1.0)), 1)
new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last))
}

3.4. AUC

使用AreaUnderCurve计算AUC

def areaUnderROC(): Double = AreaUnderCurve.of(roc())

AreaUnderCurve使用直角梯形法计算曲线下的面积

//直角梯形的面积,式(5)
private def trapezoid(points: Seq[(Double, Double)]): Double = {
require(points.length == 2)
val x = points.head
val y = points.last
(y._1 - x._1) * (y._2 + x._2) / 2.0
}

计算相邻两点构成的直角梯形的面积,入参是包含两点的序列

def of(curve: RDD[(Double, Double)]): Double = { 
curve.sliding(2).aggregate(0.0)(
seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
combOp = _ + _
)
}

入参是ROC曲线,(FPR, TPR)对的RDD,每次滑动步长为1,窗口大小为2,构造包括相邻两点的数组,计算曲线下面积,然后累加得到整个曲线的面积。

3.5. (precision, recall)曲线

PR曲线

def pr(): RDD[(Double, Double)] = { 
val prCurve = createCurve(Recall, Precision)
val sc = confusions.context
val first = sc.makeRDD(Seq((0.0, 1.0)), 1)
first.union(prCurve)
}

曲线面积

def areaUnderPR(): Double = AreaUnderCurve.of(pr())

3.6. 其他

score作为阈值(threshold)时,与其他指标构成的曲线,包括(threshold, F-measure),(threshold, precision),(threshold, recall),是要使用createCurve函数。

4. 结语

我们介绍了二分类的一些常用评价指标及在spark中的实现,其中的难点主要是label分布的分布式统计,以及spark AUC的计算方式。