Guillermo Herrera
Guillermo Herrera

Reputation: 510

How to work with Java Apache Spark MLlib when DataFrame has columns?

So I'm new to Apache Spark and I have a file that looks like this:

Name     Size    Records 
File1    1,000   104,370 
File2    950     91,780 
File3    1,500   109,123 
File4    2,170   113,888
File5    2,000   111,974
File6    1,820   110,666
File7    1,200   106,771 
File8    1,500   108,991 
File9    1,000   104,007
File10   1,300   107,037
File11   1,900   111,109
File12   1,430   108,051
File13   1,780   110,006
File14   2,010   114,449
File15   2,017   114,889

This is my sample/test data. I'm working on an anomaly detection program and I have to test other files with the same format but different values and detect which one have anomalies on the size and records values (if size/records on another file differ a lot from the standard one, or if size and records are not proportional within each other). I decided to start trying different ML algorithms and I wanted to start with the k-Means approach. I tried putting this file on the following line:

KMeansModel model = kmeans.fit(file)

file is already parsed to a Dataset variable. However I get an error and I'm pretty sure it has to do with the structure/schema of the file. Is there a way to work with structured/labeled/organized data when trying to fit in on a model?

I get the following error: Exception in thread "main" java.lang.IllegalArgumentException: Field "features" does not exist.

And this is the code:

public class practice {

public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Anomaly Detection").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    SparkSession spark = SparkSession
              .builder()
              .appName("Anomaly Detection")
              .getOrCreate();

String day1 = "C:\\Users\\ZK0GJXO\\Documents\\day1.txt";

    Dataset<Row> df = spark.read().
            option("header", "true").
            option("delimiter", "\t").
            csv(day1);
    df.show();
    KMeans kmeans = new KMeans().setK(2).setSeed(1L);
    KMeansModel model = kmeans.fit(df);
}

}

Thanks

Upvotes: 1

Views: 966

Answers (1)

Brian Cajes
Brian Cajes

Reputation: 3402

By default all Spark ML models train on a column called "features". One can specify a different input column name via the setFeaturesCol method http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/clustering/KMeans.html#setFeaturesCol(java.lang.String)

update:

One can combine multiple columns into a single feature vector using VectorAssembler:

VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"size", "records"})
.setOutputCol("features");

 Dataset<Row> vectorized_df = assembler.transform(df)

 KMeans kmeans = new KMeans().setK(2).setSeed(1L);
 KMeansModel model = kmeans.fit(vectorized_df);

One can further streamline and chain these feature transformations with the pipeline API https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline

Upvotes: 3

Related Questions