Vishal
Vishal

Reputation: 1492

'Connection refused' exception while using streaming query

I am trying to read streaming data input as follows

object SocketReadExample {

    def main(args: Array[String]): Unit = {

      val sparkSession = SparkSession.builder
        .master("local")
        .appName("example")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .getOrCreate()
      //create stream from socket
      val socketStreamDf = sparkSession.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 50050)
        .load()

      val consoleDataFrameWriter = socketStreamDf.writeStream
        .format("console")
        .outputMode(OutputMode.Append())

      val query = consoleDataFrameWriter.start()

      query.awaitTermination()
       }
   }

For which I am facing following errors :

 Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Connection
 refused
 === Streaming Query ===
 Identifier: [id = 2bdde43c-319d-48fc-941a-e8d794294a1d, runId = 8b1fd51e-b610-497b-b903-d66367856302]
 Current Committed Offsets: {}
 Current Available Offsets: {}

 Current State: INITIALIZING
 Thread State: RUNNABLE
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:343)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
 Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)

Upvotes: 3

Views: 2897

Answers (2)

yuhao yao
yuhao yao

Reputation: 51

I have matched this problem before.you should open the port before you start your program,like this:

nc -lk 50050

then it will be OK.

Upvotes: 5

user2751227
user2751227

Reputation: 1

I just had the same problem, you gived me the idea of checking spark driver config and i solved it by setting host and port like this.

val session: SparkSession = SparkSession.builder()
  .appName("Spark example")
  .master("local[2]")
  .config("spark.driver.host", "127.0.0.1")
  .config("spark.driver.port", "9999")
  .config("spark.testing.memory", "2147480000")
  .getOrCreate()

  ...

  val query = consoleDataFrameWriter.start()
  query.awaitTermination()

Upvotes: -1

Related Questions