Harun Zengin
Harun Zengin

Reputation: 43

Spark w Kafka - can't get enough parallelization

I am running spark with the local[8] configuration. The input is a kafka stream with 8 brokers. But as seen in the system monitor, it isn't parallel enough, it seems that about only one node is running. The input to the kafka streamer is about 1.6GB big, so it should process much faster.

system monitor

Kafka Producer:

import java.io.{BufferedReader, FileReader}
import java.util
import java.util.{Collections, Properties}

import logparser.LogEvent
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.producer.{KafkaProducer, Producer,     ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer


object sparkStreaming{

  private val NUMBER_OF_LINES = 100000000

  val brokers ="localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096,localhost:9097,localhost:9098,localhost:9099"
  val topicName = "log-1"
  val fileName = "data/HDFS.log"
  val producer = getProducer()

  // no hdfs , read from text file.
  def produce(): Unit = {
    try { //1. Get the instance of Configuration
      val configuration = new Configuration


      val fr = new FileReader(fileName)
      val br = new BufferedReader(fr)
      var line = ""
      line = br.readLine
      var count = 1
      //while (line != null){


      while ( {
        line != null && count < NUMBER_OF_LINES
      }) {
        System.out.println("Sending batch " + count + "  " + line)
        producer.send(new ProducerRecord[String, LogEvent](topicName,     new LogEvent(count,line,System.currentTimeMillis())))
        line = br.readLine
        count = count + 1
      }
      producer.close()
      System.out.println("Producer exited successfully for " +     fileName)
    } catch {
      case e: Exception =>
        System.out.println("Exception while producing for " +     fileName)
        System.out.println(e)
    }
  }

  private def getProducer() : KafkaProducer[String,LogEvent] = { // create instance for properties to access producer configs
    val props = new Properties
    //Assign localhost id
    props.put("bootstrap.servers", brokers)
    props.put("auto.create.topics.enable", "true")
    //Set acknowledgements for producer requests.
    props.put("acks", "all")
    //If the request fails, the producer can automatically retry,
    props.put("retries", "100")
    //Specify buffer size in config
      props.put("batch.size", "16384")
    //Reduce the no of requests less than 0
    props.put("linger.ms", "1")
    //The buffer.memory controls the total amount of memory available     to the producer for buffering.
    props.put("buffer.memory", "33554432")
    props.put("key.serializer",     "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "logparser.LogEventSerializer")
    props.put("topic.metadata.refresh.interval.ms", "1")
    val producer = new KafkaProducer[String, LogEvent](props)
    producer
  }

  def sendBackToKafka(logEvent: LogEvent): Unit ={
    producer.send(new ProducerRecord[String, LogEvent]    ("times",logEvent))
  }


  def main (args: Array[String]): Unit = {
    println("Starting to produce");
    this.produce();
  }
}

Consumer:

package logparser


import java.io._

import java.util.Properties


import kafka.serializer.StringDecoder

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka010._




object consumer {


  var tFromKafkaToSpark: Long = 0

  var tParsing :  Long = 0

  val startTime = System.currentTimeMillis()

  val CPUNumber = 8

  val pw = new PrintWriter(new FileOutputStream("data/Streaming"+CPUNumber+"config2x.txt",false))

  pw.write("Writing Started")



  def printstarttime(): Unit ={

    pw.print("StartTime : " + System.currentTimeMillis())

  }

  def printendtime(): Unit ={

    pw.print("EndTime : " + System.currentTimeMillis());
  }
  val producer = getProducer()


  private def getProducer() : KafkaProducer[String,TimeList] = { // create instance for properties to access producer configs
    val props = new Properties
    val brokers ="localhost:9090,"
//Assign localhost id
    props.put("bootstrap.servers", brokers)
    props.put("auto.create.topics.enable", "true")
    //Set acknowledgements for producer requests.
    props.put("acks", "all")
    //If the request fails, the producer can automatically retry,
    props.put("retries", "100")
    //Specify buffer size in config
    props.put("batch.size", "16384")
    //Reduce the no of requests less than 0
    props.put("linger.ms", "1")
    //The buffer.memory controls the total amount of memory available     to the producer for buffering.
    props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "logparser.TimeListSerializer")
props.put("topic.metadata.refresh.interval.ms", "1")
val producer = new KafkaProducer[String, TimeList](props)
producer
  }

  def sendBackToKafka(timeList: TimeList): Unit ={
    producer.send(new ProducerRecord[String, TimeList]("times",timeList))
  }

def main(args: Array[String]) {

val topics = "log-1"
//val Array(brokers, ) = Array("localhost:9092","log-1")
val brokers = "localhost:9092"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[" + CPUNumber + "]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
var kafkaParams = Map[String, AnyRef]("metadata.broker.list" -> brokers)
kafkaParams = kafkaParams + ("bootstrap.servers" -> "localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096,localhost:9097,localhost:9098,localhost:9099")
kafkaParams = kafkaParams + ("auto.offset.reset"-> "latest")
kafkaParams = kafkaParams + ("group.id" -> "test-consumer-group")
kafkaParams = kafkaParams + ("key.deserializer" -> classOf[StringDeserializer])
kafkaParams = kafkaParams + ("value.deserializer"-> "logparser.LogEventDeserializer")
//kafkaParams.put("zookeeper.connect", "192.168.101.165:2181");
kafkaParams = kafkaParams + ("enable.auto.commit"-> "true")
kafkaParams = kafkaParams + ("auto.commit.interval.ms"-> "1000")
kafkaParams = kafkaParams + ("session.timeout.ms"-> "20000")
kafkaParams = kafkaParams + ("metadata.max.age.ms"-> "1000")
val messages = KafkaUtils.createDirectStream[String, LogEvent](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, LogEvent](topicsSet, kafkaParams))

var started = false
val lines = messages.map(_.value)
val lineswTime = lines.map(event =>
  {
    event.addNextEventTime(System.currentTimeMillis())
    event
  }
)

lineswTime.foreachRDD(a => a.foreach(e => println(e.getTimeList)))

val logLines = lineswTime.map(
  (event) => {
    //println(event.getLogline.stringMessages.toString)
    event.setLogLine(event.getContent)
    println("Got event with id   =  " + event.getId)
    event.addNextEventTime(System.currentTimeMillis())
    println(event.getLogline.stringMessages.toString)
    event
  }
)
//logLines.foreachRDD(a => a.foreach(e => println(e.getTimeList  +  e.getLogline.stringMessages.toString)))


val x = logLines.map(le => {
  le.addNextEventTime(System.currentTimeMillis())
  sendBackToKafka(new TimeList(le.getTimeList))
  le
})

x.foreachRDD(a => a.foreach(e => println(e.getTimeList)))


//logLines.map(ll => ll.addNextEventTime(System.currentTimeMillis()))
println("--------------***///*****-------------------")

//logLines.print(10)
/*
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
*/

// Start the computation
ssc.start()
ssc.awaitTermination()
ssc.stop(false)

pw.close()

} }

Upvotes: 0

Views: 286

Answers (2)

nivox
nivox

Reputation: 2130

There's a piece of information missing in your problem statement: how many partitions does your input topic log-1 have?

My guess is that such topic have less than 8 partitions.

The parallelism of Spark Streaming (in case of a Kafka source) is tied (modulo re-partitioning) to the number of total Kafka partitions it consumes (i.e. the RDDs' partitions are taken from the Kafka partitions).

If, as I suspect, your input topic only has a few partitions, for each micro-batch Spark Streaming will task only an equal amount of nodes with the computation. All the other nodes will sit idling.

The fact that you see all the node working in an almost round-robin fashion is due to the fact that Spark do not always choose the same node for processing data for the same partition, but it actually actively mix things up.

In order to have a better idea on what's happening I'd need to see some statistics from the Spark UI Streaming page.

Given the information you provided so far however, the insufficient Kafka partitioning would be my best bet for this behaviour.

Upvotes: 4

Gauthier Feuillen
Gauthier Feuillen

Reputation: 184

Everything consuming from Kafka is limited by the number of partitions your topic(s) has. One consumer per partition. How much do you have ?

Although Spark can redistribute the work, it's not recommended as you might be spending more time exchanging information between executors than actually processing it.

Upvotes: 0

Related Questions