BdEngineer
BdEngineer

Reputation: 3199

How to control processing of spark-stream while there is no data in Kafka topic

I am using spark-sql 2.4.1 , spark-cassandra-connector_2.11-2.4.1.jar and java8.

I have cassandra table like this:

CREATE company(company_id int, start_date date, company_name text, PRIMARY_KEY (company_id, start_date))
WITH CLUSTERING ORDER BY (start_date DESC);

The field start_date here is a derived field, which is calculated in the business logic.

I have spark-sql streaming code in which I call below mapFunction.

public static MapFunction<Company, CompanyTransformed>  mapFunInsertCompany = ( record ) ->{

  CompanyTransformed  rec = new CompanyTransformed();

  rec.setCompany_id(record.getCompanyId());
  rec.setCompany_name(record.getCompanyName());

  if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
    rec.setStart_date(record.getCreateDate());
  if(record.getChangeFlag().equalsIgnoreCase("U"))
    rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));

  return rec;
};

While starting my consumer and there is no records in the kafka topic, the streaming flow continuously calls above map function.

Because record.getCreateDate() = null start_date is set to null.

But start_date is part of primary key in my C* table, hence, insertion failing and spark indefinitely waits, can NOT recover and save data into C* table.

So 1. what should be done to fix it? Any clue please?

Part 2 :

  1. How to recover from failure ?

latestRecords .writeStream() .foreachBatch((batchDf, batchId) -> { batchDf .write() .format("org.apache.spark.sql.cassandra") .option("table", "company") .option("keyspace", "ks_1") .mode(SaveMode.Append) .save(); }).start()..awaitTermination();

I am using above Java API, I dont find equalent method to check "isEmpty" rdd in java.

Any clue how to handle in java ?

Part 3:

Tried this

.foreachBatch((batchDf, batchId) -> {
    System.out.println( "latestRecords batchDf.isEmpty : " + 
     batchDf.isEmpty() + "\t length : " + batchDf.rdd().getPartitions().length);
 }

Gives output as

latestRecords batchDf.isEmpty : false    length : 6

So how to check isEmpty ? as isEmpty : false

part 4 :

While I start consumer, no data available in topic. Even though dataset showing no data , but count shows 3 as show below output, how is it possible ?

If I try this

.foreachBatch((batchDf, batchId) -> {
 System.out.println( "latestRecords batchDf.rdd().count : " + batchDf.rdd().count() + "\t batchDf.count :" + batchDf.count());
}

output

latestRecords batchDf.rdd().count : 3    batchDf.count :3

Upvotes: 0

Views: 673

Answers (1)

Michael Heil
Michael Heil

Reputation: 18515

You are facing a common problem for Spark Streaming Applications. When there is no data in the source (in your case a Kafka Topic) Spark creates an emptyRDD. You can validated if an RDD is empty by adding

if(!rdd.isEmpty)

Before calling your method mapFunInsertCompany.

Please also have a look at this blog post.

Upvotes: 1

Related Questions