Reputation: 1209
I am using AWS glue to execute Kmeans clustering on my dataset. I wish to find not only the cluster labels but also the cluster centers. I am failing to find the later.
In the code below model.clusterCenters returns NULL. KMeans clustering works fine, and it returns the cluster label i.e. clusterInstance variable.
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Clustering {
case class ObjectDay(realnumber: Double, bnumber : Double, blockednumber: Double,
creationdate : String, fname : String, uniqueid : Long, registrationdate : String,
plusnumber : Double, cvalue : Double, hvalue : Double)
case class ClusterInfo( instance: Int, centers: String)
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(sc)
val spark: SparkSession = glueContext.getSparkSession
import spark.implicits._
// write your code here - start
// Data Catalog: database and table name
val dbName = "dbname"
val tblName = "raw"
val sqlText = "SELECT <columns removed> FROM viewname WHERE `creation_date` ="
// S3 location for output
val outputDir = "s3://blucket/path/"
// Read data into a DynamicFrame using the Data Catalog metadata
val rawDyf: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblName).getDynamicFrame()
// get only single day data with only numbers
// Spark SQL on a Spark dataframe
val numberDf = rawDyf.toDF()
numberDf.createOrReplaceTempView("viewname")
def getDataViaSql(runDate : LocalDate): RDD[ObjectDay] ={
val data = spark.sql(s"${sqlText} '${runDate.toString}'")
data.as[ObjectDay].rdd
}
def getDenseVector(rddnumbers: RDD[ObjectDay]): RDD[linalg.Vector]={
rddnumbers.map(s => Vectors.dense(Array(s.realnumber, s.bnumber, s.blockednumber))).cache()
}
def getClusters( numbers: RDD[linalg.Vector] ): RDD[ClusterInfo] = {
// Trains a k-means model
val model: KMeansModel = KMeans.train(numbers, 2, 20)
val centers: Array[linalg.Vector] = model.clusterCenters
//put together unique_ids with cluster predictions
val clusters: RDD[Int] = model.predict(numbers)
clusters.map{ clusterInstance =>
ClusterInfo(clusterInstance.toInt, centers(clusterInstance).toJson)
}
}
def combineDataAndClusterInstances(rddnumbers : RDD[ObjectDay], clusterCenters: RDD[ClusterInfo]): DataFrame ={
val numbersWithCluster = rddnumbers.zip(clusterCenters)
numbersWithCluster.map(
x =>
(x._1.realnumber, x._1.bnumber, x._1.blockednumber, x._1.creationdate, x._1.fname,
x._1.uniqueid, x._1.registrationdate, x._1.plusnumber, x._1.cvalue, x._1.hvalue,
x._2.instance, x._2.centers)
)
.toDF("realnumber", "bnumber", "blockednumber", "creationdate",
"fname","uniqueid", "registrationdate", "plusnumber", "cvalue", "hvalue",
"clusterInstance", "clusterCenter")
}
def process(runDate : LocalDate): DataFrame = {
val rddnumbers = getDataViaSql( runDate)
val dense = getDenseVector(rddnumbers)
val clusterCenters = getClusters(dense)
combineDataAndClusterInstances(rddnumbers, clusterCenters)
}
val startdt = LocalDate.parse("2018-01-01", DateTimeFormatter.ofPattern("yyyy-MM-dd"))
val dfByDates = (0 to 240)
.map(days => startdt.plusDays(days))
.map(process(_))
val result = dfByDates.tail.fold(dfByDates.head)((accDF, newDF) => accDF.union(newDF))
val output = DynamicFrame(result, glueContext).withName(name="prediction")
// write your code here - end
glueContext.getSinkWithFormat(connectionType = "s3",
options = JsonOptions(Map("path" -> outputDir)), format = "csv").writeDynamicFrame(output)
}
}
I can successfully find the cluster centres using Python sklearn library on the same data.
UPDATED: Showing the complete Scala code which runs as Glue job. Also I am not getting any error while running the job. I just dont get any cluster centres.
What am I missing ?
Upvotes: 0
Views: 270
Reputation: 1209
Nevermind. It is generating cluster centres.
I didnt see the S3 output files until now.
I was running Glue Crawler and looking at the results in AWS Athena. The crawler created a struct or array column datatype for clustercenter column and Athena failed to parse and read the JSON stored as string in the CSV output.
Sorry to bother.
Upvotes: 0