Jake
Jake

Reputation: 4660

Ephemeral Spark Streaming..Shutdown programmatically

Background

I am new to Spark streaming and fairly novice with scala and spark.

ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Connection Refused

There is an end of file signal that is read by the DStream. I have confirmed that it is received and parsed

Problem

However, despite having read the documentation, I am unable to find a way to shut down the StreamingContext programmatically. Indeed, I am reading online that StreamingContext.stop(true, true) can lead to problems.

My code is below. Any help would be deeply appreciated.

(NOTE: logger.info("Stopping") is never logged to file)

var:stop=false;

@throws(classOf[IKodaMLException])
def  startStream(ip:String,port:Int):Unit=
{
 try {
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print

  val lmap=lines.map
  {
    l =>
      if(l.contains("IKODA_END_STREAM"))
        {
          stop=true;
          
        }

      .....do stuff and return processed line
  }

 if(stop)
    {
      logger.info("Stopping")
      ssc.stop(true,true)
    }

  
    lmap.foreachRDD {
      r =>
        if(r.count() >0) {
          .......do more stufff
        }
        else
          {
            logger.info("Empty RDD. No data received")
          }
    }
  ssc.start()
  ssc.awaitTermination()
}

Upvotes: 0

Views: 656

Answers (1)

Jake
Jake

Reputation: 4660

UPDATE: This answer seems to be factually correct, but conceptually wrong. I think a better answer is provided in this post


This answer is a bit preliminary. It answers the question but does not solve the problem. Other input is most welcome.

  • First, the documentation states that shutting down programmatically is ok. However, I have noticed one or two connection related exceptions are thrown before shutdown. However, even when SparkContext is told to shutdown along with the stream, it does not seem to do so. So shutting down programmatically seems unwise. Unless I can restart the stream, the project is moot.

  • Second, the only code that is applied to StreamingContext during streaming is code directly referencing the DSTream, so obviously the stop() call in the above code (in question above) was wrong.

  • Third, streaming (as I understand) does occur on the driver. So field variables can be created, and referenced inside DStream loops, maps etc.

  • It is possible to create a Thread that monitors for a shutdown call as field level boolean. Then call Streaming Context and shutdown.

The Thread

var stopScc=false

private def stopSccThread(): Unit = {
val thread = new Thread {
  override def run {

    var continueRun=true
    while (continueRun) {
      logger.debug("Checking status")
      if (stopScc == true) {
        getSparkStreamingContext(fieldVariables).stop(true, true)
        logger.info("Called Stop on Streaming Context")
        continueRun=false


      }
      Thread.sleep(50)
    }
  }
}
thread.start

}

The Stream

@throws(classOf[IKodaMLException])
def startStream(ip: String, port: Int): Unit = {

try {
  val ssc = getSparkStreamingContext(fieldVariables)
  ssc.checkpoint("./ikoda/cp")

  val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
  lines.print


  val lmap = lines.map {
    l =>

      if (l.contains("IKODA_END_STREAM")) {
        stopScc = true
      }
      l
  }


  lmap.foreachRDD {
    r =>
      if (r.count() > 0) {
        logger.info(s"RECEIVED: ${r.toString()} first: ${r.first().toString}")
        r.saveAsTextFile("./ikoda/test/test")
      }
      else {
        logger.info("Empty RDD. No data received")
      }
  }
  ssc.start()

  ssc.awaitTermination()
}
catch {
  case e: Exception =>
    logger.error(e.getMessage, e)
    throw new IKodaMLException(e.getMessage, e)
}

Upvotes: 0

Related Questions