温馨提示:本文翻译自stackoverflow.com,查看原文请点击:scala - Exception:features must be of type org.apache.spark.ml.linalg.VectorUDT
apache-spark scala

scala - 例外:功能必须为org.apache.spark.ml.linalg.VectorUDT类型

发布于 2020-03-29 13:11:04

我想在Spark中使用KNN运行pca。我有一个包含ID,功能的文件。

> KNN.printSchema
root
|-- id: int (nullable = true)
|-- features: double (nullable = true)

码:

val dataset =  spark.read.parquet("/usr/local/spark/dataset/data/user")
val features = new VectorAssembler()
    .setInputCols(Array("id", "features" ))
    .setOutputCol("features")
val Array(train, test) = dataset
      .randomSplit(Array(0.7, 0.3), seed = 1234L)
      .map(_.cache())

//create PCA matrix to reduce feature dimensions
val pca = new PCA()
      .setInputCol("features")
      .setK(5)
      .setOutputCol("pcaFeatures")
val knn = new KNNClassifier()
      .setTopTreeSize(dataset.count().toInt / 5)
      .setFeaturesCol("pcaFeatures")
      .setPredictionCol("predicted")
      .setK(1)
val pipeline = new Pipeline()
      .setStages(Array(pca, knn))
      .fit(train)

上面的代码块抛出此异常

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
    at org.apache.spark.ml.feature.PCAParams$class.validateAndTransformSchema(PCA.scala:54)
    at org.apache.spark.ml.feature.PCAModel.validateAndTransformSchema(PCA.scala:125)
    at org.apache.spark.ml.feature.PCAModel.transformSchema(PCA.scala:162)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:180)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:70)
    at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:132)
    at KNN$.main(KNN.scala:63)
    at KNN.main(KNN.scala)

查看更多

查看更多

提问者
salma R
被浏览
288
Oli 2020-02-01 00:12

基本上,您尝试将数据集拆分为训练和测试,组合特征,运行PCA,然后通过分类器来预测。总体逻辑是正确的,但是您的代码存在一些问题。

  1. Spark中的PCA需要组装的功能。您创建了一个,但未在代码中使用它。
  2. 您将名称命名features为汇编器的输出,并且已经有一个名为该方式的列。由于不使用它,因此不会看到错误,但是如果您使用此错误,则会得到此异常:
java.lang.IllegalArgumentException: Output column features already exists.
  1. 运行分类时,您至少需要指定的输入setFeaturesCol要素和要尝试学习的标签setLabelCol您未指定标签,默认情况下,标签为"label"您没有以这种方式命名的任何列,因此异常 Spark 会抛出。

这是您要尝试执行的工作示例。

// a funky dataset with 3 features (`x1`, `x2`, `x`3) and a label `y`,
// the class we are trying to predict.
val dataset = spark.range(10)
    .select('id as "x1", rand() as "x2", ('id * 'id) as "x3")
    .withColumn("y", (('x2 * 3 + 'x1) cast "int").mod(2))
    .cache()

// splitting the dataset, that part was ok ;-)
val Array(train, test) = dataset
      .randomSplit(Array(0.7, 0.3), seed = 1234L)
      .map(_.cache())

// An assembler, the output name cannot be one of the inputs.
val assembler = new VectorAssembler()
    .setInputCols(Array("x1", "x2", "x3"))
    .setOutputCol("features")

// A pca, that part was ok as well
val pca = new PCA()
    .setInputCol("features")
    .setK(2)
    .setOutputCol("pcaFeatures")

// A LogisticRegression classifier. (KNN is not part of spark's standard API, but
// requires the same minimum information: features and label)
val classifier = new LogisticRegression()
    .setFeaturesCol("pcaFeatures")
    .setLabelCol("y")

// And the full pipeline
val pipeline = new Pipeline().setStages(Array(assembler, pca, classifier))
val model = pipeline.fit(train)