user2297683
user2297683

Reputation: 406

Spark structured streaming 2.2 and k-means

I am streaming data read from a folder stored on HDFS. I have the following small piece of code:

// Convert text into a DataSet of LogEntry rows. Select the two columns we care about
  val df = rawData.flatMap(parseLog).select("ip", "status")
  df .isStreaming


  val kmeans = new KMeans().setK(2).setSeed(1L)
  val model = kmeans.fit(df)

  // Evaluate clustering by computing Within Set Sum of Squared Errors.
  val WSSSE = model.computeCost(df)
  println(s"Within Set Sum of Squared Errors = $WSSSE")

  // Shows the K-means result
  println("Cluster Centers: ")
  model.clusterCenters.foreach(println)

When I run the above, I get the following error:

java.lang.IllegalArgumentException: Field "features" does not exist.
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
  at org.apache.spark.ml.clustering.KMeansParams$class.validateAndTransformSchema(KMeans.scala:93)
  at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:254)
  at org.apache.spark.ml.clustering.KMeans.transformSchema(KMeans.scala:340)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:305)
  at StructuredStreaming$.main(<console>:189)
  ... 90 elided

I am completely stumped on this

. Any assistance would be appreciated.

UPDATE

I did the following modification to EmiCareOfCell44 answer:

import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors

val assembler = new VectorAssembler().setInputCols(Array("ip", "status")).setOutputCol("features")
val output = assembler.transform(df).select("features")
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(output)

The code now loads but when I go to run it, I get the following error:

java.lang.IllegalArgumentException: Data type StringType is not supported.
  at org.apache.spark.ml.feature.VectorAssembler$$anonfun$transformSchema$1.apply(VectorAssembler.scala:121)
  at org.apache.spark.ml.feature.VectorAssembler$$anonfun$transformSchema$1.apply(VectorAssembler.scala:117)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at org.apache.spark.ml.feature.VectorAssembler.transformSchema(VectorAssembler.scala:117)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:54)
  at StructuredStreaming$.main(<console>:129)
  ... 60 elided

I think it is getting closer though and just needs a tweak.

Upvotes: 1

Views: 572

Answers (1)

Emiliano Martinez
Emiliano Martinez

Reputation: 4123

You have to use VectorAssembler first to create the feature vector. Something like:

 val assembler = new VectorAssembler().setInputCols(Array("ip", "status")).setOutputCol("features")
  val df2 = assembler.transform(df).select("features")
  val kmeans = new KMeans().setK(2).setSeed(1L)
  val model = kmeans.fit(df2)

Upvotes: 1

Related Questions