Nikhil S
Nikhil S

Reputation: 1209

KMeansModel.clusterCenters returns NULL

I am using AWS glue to execute Kmeans clustering on my dataset. I wish to find not only the cluster labels but also the cluster centers. I am failing to find the later.

In the code below model.clusterCenters returns NULL. KMeans clustering works fine, and it returns the cluster label i.e. clusterInstance variable.

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Clustering {
  case class ObjectDay(realnumber: Double, bnumber : Double, blockednumber: Double,
                    creationdate : String, fname : String, uniqueid : Long, registrationdate : String,
                    plusnumber : Double, cvalue : Double, hvalue : Double)
  case class ClusterInfo( instance: Int, centers: String)

  def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(sc)
    val spark: SparkSession = glueContext.getSparkSession
    import spark.implicits._

    // write your code here - start
    // Data Catalog: database and table name
    val dbName = "dbname"
    val tblName = "raw"
    val sqlText = "SELECT <columns removed> FROM viewname WHERE `creation_date` ="

    // S3 location for output
    val outputDir = "s3://blucket/path/"

    // Read data into a DynamicFrame using the Data Catalog metadata
    val rawDyf: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblName).getDynamicFrame()

    // get only single day data with only numbers
    // Spark SQL on a Spark dataframe
    val numberDf = rawDyf.toDF()
    numberDf.createOrReplaceTempView("viewname")

    def getDataViaSql(runDate : LocalDate): RDD[ObjectDay] ={
      val data = spark.sql(s"${sqlText} '${runDate.toString}'")
      data.as[ObjectDay].rdd
    }

    def getDenseVector(rddnumbers: RDD[ObjectDay]): RDD[linalg.Vector]={
      rddnumbers.map(s => Vectors.dense(Array(s.realnumber, s.bnumber, s.blockednumber))).cache()
    }

    def getClusters( numbers: RDD[linalg.Vector] ): RDD[ClusterInfo]  = {
      // Trains a k-means model
      val model: KMeansModel = KMeans.train(numbers, 2, 20)
      val centers: Array[linalg.Vector] = model.clusterCenters

      //put together unique_ids with cluster predictions
      val clusters: RDD[Int] = model.predict(numbers)

      clusters.map{ clusterInstance =>
          ClusterInfo(clusterInstance.toInt, centers(clusterInstance).toJson)
      }
    }
    def combineDataAndClusterInstances(rddnumbers : RDD[ObjectDay], clusterCenters: RDD[ClusterInfo]): DataFrame ={
      val numbersWithCluster = rddnumbers.zip(clusterCenters)
        numbersWithCluster.map(
          x =>
            (x._1.realnumber, x._1.bnumber, x._1.blockednumber, x._1.creationdate, x._1.fname,
            x._1.uniqueid, x._1.registrationdate, x._1.plusnumber, x._1.cvalue, x._1.hvalue,
              x._2.instance, x._2.centers)
        )
        .toDF("realnumber", "bnumber", "blockednumber", "creationdate",
        "fname","uniqueid", "registrationdate", "plusnumber", "cvalue", "hvalue",
        "clusterInstance", "clusterCenter")
    }
    def process(runDate : LocalDate): DataFrame = {
      val rddnumbers = getDataViaSql( runDate)
      val dense = getDenseVector(rddnumbers)
      val clusterCenters = getClusters(dense)
      combineDataAndClusterInstances(rddnumbers, clusterCenters)
    }

    val startdt = LocalDate.parse("2018-01-01", DateTimeFormatter.ofPattern("yyyy-MM-dd"))

    val dfByDates = (0 to 240)
      .map(days => startdt.plusDays(days))
      .map(process(_))

    val result = dfByDates.tail.fold(dfByDates.head)((accDF, newDF) => accDF.union(newDF))

    val output = DynamicFrame(result, glueContext).withName(name="prediction")

    // write your code here - end
    glueContext.getSinkWithFormat(connectionType = "s3",
      options = JsonOptions(Map("path" -> outputDir)), format = "csv").writeDynamicFrame(output)
  }
}

I can successfully find the cluster centres using Python sklearn library on the same data.

UPDATED: Showing the complete Scala code which runs as Glue job. Also I am not getting any error while running the job. I just dont get any cluster centres.

What am I missing ?

Upvotes: 0

Views: 270

Answers (1)

Nikhil S
Nikhil S

Reputation: 1209

Nevermind. It is generating cluster centres.

I didnt see the S3 output files until now.

I was running Glue Crawler and looking at the results in AWS Athena. The crawler created a struct or array column datatype for clustercenter column and Athena failed to parse and read the JSON stored as string in the CSV output.

Sorry to bother.

Upvotes: 0

Related Questions