从具有1个主服务器和2个工作服务器的集群执行Spark作业时可能出现序列化错

时间:2022-10-19 23:08:32

I am executing Spark job on cluster. While it runs perfectly in a local mode, in the cluster it fails in the following line:

我在集群上执行Spark作业。虽然它在本地模式下运行完美,但在集群中它在以下行中失败:

val labelsAndPredictions = testData.map(p => (model.predict(p.features), p.label))

I understand that the issue might be related to the fact that the object model is unaccessible for workers. Therefore I tried another approach. I created a serializable object Utils with two functions:

我知道这个问题可能与对象模型对于工人无法访问的事实有关。因此我尝试了另一种方法。我创建了一个具有两个函数的可序列化对象Utils:

object Utils {

  var model: GradientBoostedTreesModel = null

  def loadModel(m: GradientBoostedTreesModel): Unit = {
    model = m
  }

  def makePrediction(features: org.apache.spark.mllib.linalg.Vector): Double = {
    println(model.trees)
    val predictedLabel = model.predict(features)
    println("predictedLabel: " + predictedLabel)
    predictedLabel
  }

}

Then I substituted the prediction as follows:

然后我将预测替换如下:

  model = GradientBoostedTrees.train(trainingData, boostingStrategy)
  Utils.loadModel(model)

  val labelsAndPredictions = testData.map { x =>
    val prediction = Utils.makePrediction(x.features)
    (x.label, prediction)
  }

This is the command that I use to submit the Spark job:

这是我用来提交Spark作业的命令:

spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+AlwaysPreTouch" --class org.test.Runner s3://mybucket/keys/trainer.jar

1 个解决方案

#1


0  

The utils object won't solve the problem because the model is initialised static to the driver, not workers.

utils对象不能解决问题,因为模型初始化为驱动程序而不是工作程序。

The last time I checked (1.6.x) models are indeed not serializable.

我最后一次检查(1.6.x)模型确实不可序列化。

What you can do is:

你能做的是:

model.predict(testData.map(_.features))

But that won't provide you a nice way to assess the model without labels.

但是,如果没有标签,这将无法为您提供评估模型的好方法。

Or

testData.toLocalIterator.map { x => (x.label, model.predict(x.features)) }

toLocalIterator iterates the data in each partition, but on the driver rather than workers, giving you access to your model. Unfortunately not parallelised though.

toLocalIterator迭代每个分区中的数据,但是在驱动程序而不是工作程序上,使您可以访问模型。不幸的是,没有并行化。

#1


0  

The utils object won't solve the problem because the model is initialised static to the driver, not workers.

utils对象不能解决问题,因为模型初始化为驱动程序而不是工作程序。

The last time I checked (1.6.x) models are indeed not serializable.

我最后一次检查(1.6.x)模型确实不可序列化。

What you can do is:

你能做的是:

model.predict(testData.map(_.features))

But that won't provide you a nice way to assess the model without labels.

但是,如果没有标签,这将无法为您提供评估模型的好方法。

Or

testData.toLocalIterator.map { x => (x.label, model.predict(x.features)) }

toLocalIterator iterates the data in each partition, but on the driver rather than workers, giving you access to your model. Unfortunately not parallelised though.

toLocalIterator迭代每个分区中的数据,但是在驱动程序而不是工作程序上,使您可以访问模型。不幸的是,没有并行化。