I want to run pca with KNN in spark. I have a file that contains id, features.
> KNN.printSchema
root
|-- id: int (nullable = true)
|-- features: double (nullable = true)
code:
val dataset = spark.read.parquet("/usr/local/spark/dataset/data/user")
val features = new VectorAssembler()
.setInputCols(Array("id", "features" ))
.setOutputCol("features")
val Array(train, test) = dataset
.randomSplit(Array(0.7, 0.3), seed = 1234L)
.map(_.cache())
//create PCA matrix to reduce feature dimensions
val pca = new PCA()
.setInputCol("features")
.setK(5)
.setOutputCol("pcaFeatures")
val knn = new KNNClassifier()
.setTopTreeSize(dataset.count().toInt / 5)
.setFeaturesCol("pcaFeatures")
.setPredictionCol("predicted")
.setK(1)
val pipeline = new Pipeline()
.setStages(Array(pca, knn))
.fit(train)
Above code block is throwing this exception
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
at org.apache.spark.ml.feature.PCAParams$class.validateAndTransformSchema(PCA.scala:54)
at org.apache.spark.ml.feature.PCAModel.validateAndTransformSchema(PCA.scala:125)
at org.apache.spark.ml.feature.PCAModel.transformSchema(PCA.scala:162)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:180)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:70)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:132)
at KNN$.main(KNN.scala:63)
at KNN.main(KNN.scala)
Basically, you are trying to split the dataset into training and test, assemble features, run a PCA and then a classifier to predict something. The overall logic is correct but there are several problems with your code.
features
to the output of the assembler, and you already have a column named that way. Since you do not use it, you don't see an error but if you were you would get this exception:java.lang.IllegalArgumentException: Output column features already exists.
setFeaturesCol
and the label you are trying to learn with setLabelCol
. You did not specified the label and by default, the label is "label"
. You don't have any column named that way, hence the exception spark throws at you.Here is a working example of what you are trying to do.
// a funky dataset with 3 features (`x1`, `x2`, `x`3) and a label `y`,
// the class we are trying to predict.
val dataset = spark.range(10)
.select('id as "x1", rand() as "x2", ('id * 'id) as "x3")
.withColumn("y", (('x2 * 3 + 'x1) cast "int").mod(2))
.cache()
// splitting the dataset, that part was ok ;-)
val Array(train, test) = dataset
.randomSplit(Array(0.7, 0.3), seed = 1234L)
.map(_.cache())
// An assembler, the output name cannot be one of the inputs.
val assembler = new VectorAssembler()
.setInputCols(Array("x1", "x2", "x3"))
.setOutputCol("features")
// A pca, that part was ok as well
val pca = new PCA()
.setInputCol("features")
.setK(2)
.setOutputCol("pcaFeatures")
// A LogisticRegression classifier. (KNN is not part of spark's standard API, but
// requires the same minimum information: features and label)
val classifier = new LogisticRegression()
.setFeaturesCol("pcaFeatures")
.setLabelCol("y")
// And the full pipeline
val pipeline = new Pipeline().setStages(Array(assembler, pca, classifier))
val model = pipeline.fit(train)
ok,I try it but error found "not found: value rand ".
Add
import org.apache.spark.sql.functions_.
thanks.but Field "x1" does not exist.
Is it necessary to convert file to LibSVM?
About
x1
, I know it does not exist in your data. It was just en example. You have to replacex1
,x2
,x3
by your features, andy
by your label.