Sree Eedupuganti
Sree Eedupuganti

Reputation: 440

Can't access the data in Kafka Spark Streaming globally

I am trying to Streaming the data from Kafka to Spark

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, topics);

Here i am iterating over the JavaPairInputDStream to process the RDD's.

directKafkaStream.foreachRDD(rdd ->{
            rdd.foreachPartition(items ->{
                while (items.hasNext()) {
                    String[] State = items.next()._2.split("\\,");
                    System.out.println(State[2]+","+State[3]+","+State[4]+"--");
                };
            });        
        }); 

I can able to fetch the data in foreachRDD and my requirement is have to access State Array globally. When i am trying to access the State Array globally i am getting Exception

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

Any suggestions ? Thanks.

Upvotes: 0

Views: 233

Answers (1)

Darshan
Darshan

Reputation: 2333

This is more of a joining your lookup table with streaming RDD to get all the items that have a matching 'code' and 'violationCode' fields.

The flow should be like this.

  1. Create an RDD of Hive lookup table => lookupRdd
  2. Create DStream from kafka stream
  3. For each RDD in Dstream, join lookupRDD with streamRdd, process the joined items(calculate sum of amount...) and save this processed result.

Note Below code is incomplete. Please complete all the TODO comments.

JavaPairDStream<String, String> streamPair = directKafkaStream.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, String> tuple2) throws Exception {
            System.out.println("Tuple2 Message is----------" + tuple2._2());
            String[] state = tuple2._2.split("\\,");
            return new Tuple2<>(state[4], tuple2._2()); //pair <ViolationCode, data>
        }
    });

    streamPair.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
        JavaPairRDD<String, String> hivePairRdd = null;
        @Override
        public Void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
            if (hivePairRdd == null) {
                hivePairRdd = initHiveRdd();
            }
            JavaPairRDD<String, Tuple2<String, String>> joinedRdd = stringStringJavaPairRDD.join(hivePairRdd);
            System.out.println(joinedRdd.take(10));
            //todo process joinedRdd here and save the results.
            joinedRdd.count(); //to trigger an action
            return null;
        }
    });
}

public static JavaPairRDD<String, String> initHiveRdd() {
    JavaRDD<String> hiveTableRDD = null; //todo code to create RDD from hive table
    JavaPairRDD<String, String> hivePairRdd = hiveTableRDD.mapToPair(new PairFunction<String, String, String>() {
        @Override
        public Tuple2<String, String> call(String row) throws Exception {
            String code = null; //TODO process 'row' and get 'code' field
            return new Tuple2<>(code, row);
        }
    });
    return hivePairRdd;
}

Upvotes: 1

Related Questions