cSteusloff
cSteusloff

Reputation: 2628

Spark SQL failed in Spark Streaming (KafkaStream)

I use Spark SQL in a Spark Streaming Job to search in a Hive table. Kafka streaming works fine without problems. If I run hiveContext.runSqlHive(sqlQuery); outside directKafkaStream.foreachRDD it works fine without problems. But I need the Hive-Table lookup inside the streaming job. The use of JDBC (jdbc:hive2://) would work, but I want to use the Spark SQL.

The significant places of my source code looks as follows:

// set context
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true");
SparkContext sparkSqlContext = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
HiveContext hiveContext = new HiveContext(sparkSqlContext);

// Initialize Direct Spark Kafka Stream. Starts from top
JavaPairInputDStream<String, String> directKafkaStream =
                KafkaUtils.createDirectStream(streamingContext,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet);

// work on stream                   
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
    rdd.foreachPartition(tuple2Iterator -> {
        // get message
        Tuple2<String, String> item = tuple2Iterator.next();

        // lookup
        String sqlQuery = "SELECT something FROM somewhere";
        Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery);
        List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence);

        });
    return null;
});

// Start the computation
streamingContext.start();
streamingContext.awaitTermination();            

I get no meaningful error, even if I surround with try-catch.

I hope someone can help - Thanks.

//edit: The solution looks like:

// work on stream                   
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
    // driver
    Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery);
    rdd.foreachPartition(tuple2Iterator -> {
        // worker
        while (tuple2Iterator != null && tuple2Iterator.hasNext()) {
            // get message
            Tuple2<String, String> item = tuple2Iterator.next();
            // lookup
            String result = lookupMap.get(item._2());
        }
    });
    return null;
});

Upvotes: 0

Views: 524

Answers (1)

zero323
zero323

Reputation: 330063

Just because you want to use Spark SQL it won't make it possible. Spark's rule number one is no nested actions, transformations or distributed data structures.

If you can express your query for example as join you can use push it to one level higher to foreachRDD and this pretty much exhaust your options to use Spark SQL here:

directKafkaStream.foreachRDD(rdd -> 
   hiveContext.runSqlHive(sqlQuery)
   rdd.foreachPartition(...)
)

Otherwise direct JDBC connection can be a valid option.

Upvotes: 1

Related Questions