progNewbie
progNewbie

Reputation: 4822

How to use map-function in SPARK with Java

I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.

I tried to do this:

public class SimpleApp 
{   
    public static void main(String[] args) throws Exception 
    {       
        String master = "local[2]";
        String csvInput = "/home/userName/Downloads/countrylist.csv";
        String csvOutput = "/home/userName/Downloads/countrylist";

        JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));

        JavaRDD<String> csvData = sc.textFile(csvInput, 1);
        JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });

        words.saveAsTextFile(csvOutput);
    }
}

This should split the lines and return an ArrayList. But I am not sure about this. I get this error:

SimpleApp.java:[43,58] wrong number of type arguments; required 2

Upvotes: 8

Views: 39227

Answers (3)

RazvanParautiu
RazvanParautiu

Reputation: 2938

This is the sample of code from https://opencredo.com/data-analytics-using-cassandra-and-spark/ tutorial in Java.

Scala code :

  /* 1*/    val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/    val now = new Date();
/* 3*/    sc.cassandraTable("cc", "cc_transactions")
/* 4*/      .select("customerid", "amount", "card", "status", "id")
/* 5*/      .where("id < minTimeuuid(?)", now)
/* 6*/      .filter(includedStatuses contains _.getString("status"))
/* 7*/      .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/      .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/      .reduceByKey(_ + _)
/*10*/      .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/      .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))

Java code :

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
        JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
                .select("customerid", "amount", "card", "status", "id")
                .where("id < minTimeuuid(?)", date)
                .filter( row -> row.getString("status").equals("COMPLETED") )
                .keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
                .mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
                .reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
                .flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
                        List<Balance> list = new ArrayList<Balance>();
                        list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
                        return list.iterator();
                    }
                }).cache();

Where ProjectPropertie.context is SparkContext Here is how you can get SparkContext (only one context per JVM you should use):

   SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
                            .set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
                            .set("spark.cassandra.connection.port", "9042")
                            .set("spark.cassandra.auth.username", "cassandra")
                            .set("spark.cassandra.auth.password", "cassandra");
   SparkContext context = new SparkContext(conf);

For datasource I'm using Cassandra, where 172.17.0.2 is docker container where my Cassandra node is running and 127.0.0.1 is the host (in this case is local)

Upvotes: 2

Harish Pathak
Harish Pathak

Reputation: 1607

This is what you should do...

        //======Using flatMap(RDD of words)==============
       JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<String> counts = csvData.flatMap(new FlatMapFunction<String, String>() {
          //line 43
              @Override
              public Iterable<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });     

    //======Using map(RDD of Lists of words)==============
        JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<List<String>> counts = csvData.map(new Function <String, List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            }); 

    //=====================================

    counts.saveAsTextFile(GlobalConstants.STR_OUTPUT_FILE_PATH);

Upvotes: 0

Holden
Holden

Reputation: 7452

So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:

JavaRDD<String> words = rdd.flatMap(
  new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
      return Arrays.asList(s.split("\\s*,\\s*"));
    }});

Upvotes: 9

Related Questions