ohallc
ohallc

Reputation: 103

Persisting Spark Streaming output

I'm collecting the data from a messaging app, I'm currently using Flume, it sends approx 50 Million records per day

I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to hadoop and query with impala

I'm having issues with each approach I've tried..

Approach 1 - Save RDD as parquet, point an external hive parquet table to the parquet directory

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

The problem is that finalParquet.saveAsParquetFile outputs a huge number of files, the Dstream received from Kafka outputs over 200 files for a 1 minute batch size. The reason that it outputs many files is because the computation is distributed as explained in another post- how to make saveAsTextFile NOT split output into multiple file?

However, the propsed solutions don't seem optimal for me , for e.g. as one user states - Having a single output file is only a good idea if you have very little data.

Approach 2 - Use HiveContext. insert RDD data directly to a hive table

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

This works fine , it inserts directly to a parquet table but there are scheduling delays for the batches as processing time exceeds the batch interval time. The consumer cant keep up with whats being produced and the batches to process begin to queue up.

it seems writing to hive is slow. I've tried adjusting batch interval size, running more consumer instances.

In summary

What is the best way to persist Big data from Spark Streaming given that there are issues with multiple files and potential latency with writing to hive? What are other people doing?

A similar question has been asked here, but he has an issue with directories as apposed to too many files How to make Spark Streaming write its output so that Impala can read it?

Many Thanks for any help

Upvotes: 9

Views: 8681

Answers (2)

Piyush Patel
Piyush Patel

Reputation: 1751

I think the small file problem could be resolved somewhat. You may be getting large number of files based on kafka partitions. For me, I have 12 partition Kafka topic and I write using spark.write.mode("append").parquet("/location/on/hdfs").

Now depending on your requirements, you can either add coalesce(1) or more to reduce number of files. Also another option is to increase the micro batch duration. For example, if you can accept 5 minutes delay in writing day, you can have micro batch of 300 seconds.

For the second issues, the batches queue up only because you don't have back pressure enabled. First you should verify what is the max you can process in a single batch. Once you can get around that number, you can set spark.streaming.kafka.maxRatePerPartition value and spark.streaming.backpressure.enabled=true to enable limited number of records per micro batch. If you still cannot meet the demand, then the only options are to either increase partitions on topic or to increase resources on spark application.

Upvotes: 0

ecesena
ecesena

Reputation: 1165

In solution #2, the number of files created can be controlled via the number of partitions of each RDD.

See this example:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

Now, I guess you can play with the frequency at which you pull data from Kafka, and the number of partitions of each RDD (default, the partitions of your Kafka topic, that you can possibly reduce by repartitioning).

I'm using Spark 1.5 from CDH 5.5.1, and I get the same result using either df.write.mode("append").saveAsTable("test") or your SQL string.

Upvotes: 0

Related Questions