Yoni Darash
Yoni Darash

Reputation: 1

How to deserialize Flume's Avro events coming to Spark?

I have Flume Avro sink and SparkStreaming program that read the sink. CDH 5.1 , Flume 1.5.0 , Spark 1.0 , using Scala as program lang on Spark

i was able to make the Spark example and count the Flume Avro Events.

however i was not able to De serialize the Flume Avro Event into string\text and then parse the structure row.

Does anyone have an example of how to do so using Scala?

Upvotes: 0

Views: 1613

Answers (3)

sachin thirumala
sachin thirumala

Reputation: 51

You can deserialize the flume events with the below code:

val eventBody = stream.map(e => new String(e.event.getBody.array))

Here's an example of a spark streaming application for analyzing popular hashtags from twitter using a flume twitter source and avro sink to push the events to spark:

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

object PopularHashTags {

val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

def main(args: Array[String]) {

sc.setLogLevel("WARN")

System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)

val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)

val tweets = stream.map(e => new String(e.event.getBody.array))

val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  .map { case (topic, count) => (count, topic) }
  .transform(_.sortByKey(false))

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

stream.count().map(cnt => "Received " + cnt + " flume events.").print()

ssc.start()
ssc.awaitTermination()
    }

}

Upvotes: 1

Charles.w
Charles.w

Reputation: 1

Try the code below:

stream.map(e => "Event:header:" + e.event.get(0).toString
                + "body: " + new String(e.event.getBody.array)).print

Upvotes: 0

user3823859
user3823859

Reputation: 469

You can implement a custom decoder inorder to deserialize. Provide the expected type information along with it.

Upvotes: 0

Related Questions