本章主要讲述MLlib包里面的分类算法实现,目前实现的有LogisticRegression、SVM、NaiveBayes ,前两种算法针对各自的目标优化函数跟正则项,调用了Optimization模块下的随机梯度的优化,并行实现的策略主要在随机梯度的计算,而贝叶斯的的并行策略主要是计算类别的先验概率跟特征的条件概率上面,详细情况如下

LogisticRegression.scala文件

第一部分 LogisticRegressionModel 类
 1 /**
 2 
 3  * Classification model trained using Logistic Regression.
 4 
 5  *
 6 
 7  * @param weights Weights computed for every feature.
 8 
 9  * @param intercept Intercept computed for this model.
10 
11  */
12 
13 class LogisticRegressionModel(
14 
15     override val weights: Array[Double],
16 
17     override val intercept: Double)
18 
19   extends GeneralizedLinearModel(weights, intercept)
20 
21   with ClassificationModel with Serializable {
22 
23   override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
24 
25       intercept: Double) = {
26 
27     val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
28 
29     round(1.0/ (1.0 + math.exp(margin * -1)))
30 
31   }
32 
33 }

       逻辑回归的predictPoint函数,函数输入:待预测的数据样本,回归系数weights,intercept截距项,由于逻辑回归的判别函数f=1/(1+exp(-wx)),在代码中margin=-wx,最后返回1/(1+exp(-wx))值的四舍五入,也就是预测标签。

第二部分 LogisticRegressionWithSGD 
 1 class LogisticRegressionWithSGD private (
 2 
 3     var stepSize: Double,
 4 
 5     var numIterations: Int,
 6 
 7     var regParam: Double,
 8 
 9     var miniBatchFraction: Double)
10 
11   extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
12 
13   with Serializable {
14 
15   val gradient = new LogisticGradient()
16 
17   val updater = new SimpleUpdater()
18 
19   override val optimizer = new GradientDescent(gradient, updater)
20 
21       .setStepSize(stepSize)
22 
23       .setNumIterations(numIterations)
24 
25       .setRegParam(regParam)
26 
27       .setMiniBatchFraction(miniBatchFraction)
28 
29   override val validators = List(DataValidators.classificationLabels)
30 
31   /**
32 
33    * Construct a LogisticRegression object with default parameters
34 
35    */
36 
37   def this() = this(1.0, 100, 0.0, 1.0)
38 
39   def createModel(weights: Array[Double], intercept: Double) = {
40 
41     new LogisticRegressionModel(weights, intercept)
42 
43   }
44 
45 }

       源代码 先定义了gradient,updater实例(在optimization文件下下面),其中损失函数用了log-loss,没有用正则项参数,接着重写optimizer 优化算子,最后对该类成员变量stepSize,numIterations,regParam,miniBatchFraction设置默认数值。

 
第三部分 LogisticRegressionWithSGD上层接口
  1 object LogisticRegressionWithSGD {
  2 
  3  def train(
  4 
  5       input: RDD[LabeledPoint],
  6 
  7       numIterations: Int,
  8 
  9       stepSize: Double,
 10 
 11       miniBatchFraction: Double,
 12 
 13       initialWeights: Array[Double])
 14 
 15     : LogisticRegressionModel =
 16 
 17   {
 18 
 19     new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
 20 
 21       input, initialWeights)
 22 
 23   }
 24 
 25   def train(
 26 
 27       input: RDD[LabeledPoint],
 28 
 29       numIterations: Int,
 30 
 31       stepSize: Double,
 32 
 33       miniBatchFraction: Double)
 34 
 35     : LogisticRegressionModel =
 36 
 37   {
 38 
 39     new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
 40 
 41       input)
 42 
 43   }
 44 
 45   def train(
 46 
 47       input: RDD[LabeledPoint],
 48 
 49       numIterations: Int,
 50 
 51       stepSize: Double)
 52 
 53     : LogisticRegressionModel =
 54 
 55   {
 56 
 57     train(input, numIterations, stepSize, 1.0)
 58 
 59   }
 60 

 61   def train(
 62 
 63       input: RDD[LabeledPoint],
 64 
 65       numIterations: Int)
 66 
 67     : LogisticRegressionModel =
 68 
 69   {
 70 
 71     train(input, numIterations, 1.0, 1.0)
 72 
 73   }
 74 
 75   def main(args: Array[String]) {
 76 
 77     if (args.length != 4) {
 78 
 79       println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
 80 
 81         "<niters>")
 82 
 83       System.exit(1)
 84 
 85     }
 86 
 87     val sc = new SparkContext(args(0), "LogisticRegression")
 88 
 89     val data = MLUtils.loadLabeledData(sc, args(1))
 90 
 91     val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
 92 
 93     println("Weights: " + model.weights.mkString("[", ", ", "]"))
 94 
 95     println("Intercept: " + model.intercept)
 96 
 97     sc.stop()
 98 
 99   }
100 
101 }

     代码中,根据不同的输入定义了4种train的方式,在main函数里面,用到了MLUtils.loadLabeledData(sc,args(1)),该函数把文件输入<标签>,<特征1>,<特征2>...转换成定义的RDD[LabeledPoint]形式。接着调用LR进行训练,最后打印回归系数跟截距项

 
SVM.scala文件
第一部分 SVMModel 
 1 class SVMModel(
 2 
 3     override val weights: Array[Double],
 4 
 5     override val intercept: Double)
 6 
 7   extends GeneralizedLinearModel(weights, intercept)
 8 
 9   with ClassificationModel with Serializable {
10 
11  
12 
13   override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
14 
15       intercept: Double) = {
16 
17     val margin = dataMatrix.dot(weightMatrix) + intercept
18 
19     if (margin < 0) 0.0 else 1.0
20 
21   }
22 
23 }

 

跟LR类似,只不过这里面的margin换成了:WX+b的形式
 
第二部分 SVMWithSGD  
 1 class SVMWithSGD private (
 2 
 3     var stepSize: Double,
 4 
 5     var numIterations: Int,
 6 
 7     var regParam: Double,
 8 
 9     var miniBatchFraction: Double)
10 
11   extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {
12 
13  
14 
15   val gradient = new HingeGradient()
16 
17   val updater = new SquaredL2Updater()
18 
19   override val optimizer = new GradientDescent(gradient, updater)
20 
21     .setStepSize(stepSize)
22 
23     .setNumIterations(numIterations)
24 
25     .setRegParam(regParam)
26 
27     .setMiniBatchFraction(miniBatchFraction)
28 
29   override val validators = List(DataValidators.classificationLabels)
30 
31   def this() = this(1.0, 100, 1.0, 1.0)
32 
33   def createModel(weights: Array[Double], intercept: Double) = {
34 
35     new SVMModel(weights, intercept)
36 
37   }
38 
39 }

跟LR类似,gradient 换成了对hinge-loss的求梯度,updater换成了对L2正则 

 
第三部分 SVMWithSGD   上层接口
  1 object SVMWithSGD {
  2 
  3   def train(
  4 
  5       input: RDD[LabeledPoint],
  6 
  7       numIterations: Int,
  8 
  9       stepSize: Double,
 10 
 11       regParam: Double,
 12 
 13       miniBatchFraction: Double,
 14 
 15       initialWeights: Array[Double])
 16 
 17     : SVMModel =
 18 
 19   {
 20 
 21     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
 22 
 23       initialWeights)
 24 
 25   }
 26 
 27   def train(
 28 
 29       input: RDD[LabeledPoint],
 30 
 31       numIterations: Int,
 32 
 33       stepSize: Double,
 34 
 35       regParam: Double,
 36 
 37       miniBatchFraction: Double)
 38 
 39     : SVMModel =
 40 
 41   {
 42 
 43     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
 44 
 45   }
 46 
 47  
 48 
 49   def train(
 50 
 51       input: RDD[LabeledPoint],
 52 
 53       numIterations: Int,
 54 
 55       stepSize: Double,
 56 
 57       regParam: Double)
 58 
 59     : SVMModel =
 60 
 61   {
 62 
 63     train(input, numIterations, stepSize, regParam, 1.0)
 64 
 65   }
 66 
 67  
 68 
 69   def train(
 70 
 71       input: RDD[LabeledPoint],
 72 
 73       numIterations: Int)
 74 
 75     : SVMModel =
 76 
 77   {
 78 
 79     train(input, numIterations, 1.0, 1.0, 1.0)
 80 
 81   }
 82 
 83  
 84 
 85   def main(args: Array[String]) {
 86 
 87     if (args.length != 5) {
 88 
 89       println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>")
 90 
 91       System.exit(1)
 92 
 93     }
 94 
 95     val sc = new SparkContext(args(0), "SVM")
 96 
 97     val data = MLUtils.loadLabeledData(sc, args(1))
 98 
 99     val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
100 
101     println("Weights: " + model.weights.mkString("[", ", ", "]"))
102 
103     println("Intercept: " + model.intercept)
104 
105  
106 
107     sc.stop()
108 
109   }
110 
111 }

 

跟LR类似
 
NaiveBayes.scala文件
第一部分 NaiveBayesModel 
 1 class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
 2 
 3   extends ClassificationModel with Serializable {
 4 
 5  
 6 
 7   // Create a column vector that can be used for predictions
 8 
 9   private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
10 
11   private val _theta = new DoubleMatrix(theta)
12 
13  
14 
15   def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
16 
17  
18 
19   def predict(testData: Array[Double]): Double = {
20 
21     val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
22 
23     val result = _pi.add(_theta.mmul(dataMatrix))
24 
25     result.argmax()
26 
27   }
28 
29 }

        朴素贝叶斯分类器,NaiveBayesModel的输入是:训练后得到的,标签类别先验概率pi (P(y=0),P(y=1),...,P(y=K)),特征属性在指定类别下出现的条件概率theta(P(x=1 / y)),对于特征转化为TF-IDF形式可以用来文本分类,当特征转化为0-1编码的时候,基于伯努利模型可以用来分类,第一个predict函数的输入是测试数据集,第二个predict函数的输入是单个测试样本。原本的贝叶斯定理是 根据P(y|x)~ P(x|y)P(y),这里实现的时候,是对两边取了对数,加法的计算效率比乘法更高,最后,返回result.argmax() 也就是后验概率最大的那个类别

 
第二部分 NaiveBayes  
 1 class NaiveBayes private (var lambda: Double)
 2 
 3   extends Serializable with Logging
 4 
 5 {
 6 
 7   def this() = this(1.0)
 8 
 9   /** Set the smoothing parameter. Default: 1.0. */
10 
11   def setLambda(lambda: Double): NaiveBayes = {
12 
13     this.lambda = lambda
14 
15     this
16 
17   }
18 
19  
20 
21   def run(data: RDD[LabeledPoint]) = {
22 
23     val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
24 
25     val aggregated = data.aggregate(zeroCombiner)({(combiner, point) =>
26 
27       point match {
28 
29         case LabeledPoint(label, features) =>
30 
31           val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
32 
33           val fs = new DoubleMatrix(features.length, 1, features: _*)
34 
35           combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
36 
37       }
38 
39     }, { (lhs, rhs) =>
40 
41       for ((label, (c, fs)) <- rhs) {
42 
43         val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
44 
45         lhs(label) = (count + c, featuresSum.addi(fs))
46 
47       }
48 
49       lhs
50 
51     })
52 
53     // Kinds of label
54 
55     val C = aggregated.size
56 
57     // Total sample count
58 
59     val N = aggregated.values.map(_._1).sum
60 
61  
62 
63     val pi = new Array[Double](C)
64 
65     val theta = new Array[Array[Double]](C)
66 
67     val piLogDenom = math.log(N + C * lambda)
68 
69  
70 
71     for ((label, (count, fs)) <- aggregated) {
72 
73       val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
74 
75       pi(label) = math.log(count + lambda) - piLogDenom
76 
77       theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
78 
79     }
80 
81     new NaiveBayesModel(pi, theta)
82 
83   }
84 
85 }

      这个类是实现贝叶斯算法,lambda参数是用来避免P(X|Y)=0的尴尬(学术界叫法:拉普拉斯平滑),核心代码在data.aggregate,首先定义了zeroCombiner这个map类型数据结构,key表示类别,value是(Int, DoubleMatrix)元组类型,Int表示该类别在训练集中的个数(以便求先验概率),DoubleMatrix表示各个特征在该类别下的条件概率

 
第三部分 NaiveBayes  调用接口
 1 object NaiveBayes {
 2 
 3   def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
 4 
 5     new NaiveBayes().run(input)
 6 
 7   }
 8 
 9   def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
10 
11     new NaiveBayes(lambda).run(input)
12 
13   }
14 
15  
16 
17   def main(args: Array[String]) {
18 
19     if (args.length != 2 && args.length != 3) {
20 
21       println("Usage: NaiveBayes <master> <input_dir> [<lambda>]")
22 
23       System.exit(1)
24 
25     }
26 
27     val sc = new SparkContext(args(0), "NaiveBayes")
28 
29     val data = MLUtils.loadLabeledData(sc, args(1))
30 
31     val model = if (args.length == 2) {
32 
33       NaiveBayes.train(data)
34 
35     } else {
36 
37       NaiveBayes.train(data, args(2).toDouble)
38 
39     }
40 
41     println("Pi: " + model.pi.mkString("[", ", ", "]"))
42 
43     println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))
44 
45  
46 
47     sc.stop()
48 
49   }
50 
51 }

       贝叶斯训练方式分有无lambda参数,main函数先定义SparkContext,然后把数据集转化成RDD[LabelPoint]类型,经过训练,打印pi跟theta,最后八卦一下,这个算法是在Intel工作,微博名叫灵魂机器大神写的,可以follow他的github网址https://github.com/soulmachine