Reputation: 1492
I am trying to read streaming data input as follows
object SocketReadExample {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate()
//create stream from socket
val socketStreamDf = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 50050)
.load()
val consoleDataFrameWriter = socketStreamDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
val query = consoleDataFrameWriter.start()
query.awaitTermination()
}
}
For which I am facing following errors :
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Connection
refused
=== Streaming Query ===
Identifier: [id = 2bdde43c-319d-48fc-941a-e8d794294a1d, runId = 8b1fd51e-b610-497b-b903-d66367856302]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: INITIALIZING
Thread State: RUNNABLE
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:343)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
Upvotes: 3
Views: 2897
Reputation: 51
I have matched this problem before.you should open the port before you start your program,like this:
nc -lk 50050
then it will be OK.
Upvotes: 5
Reputation: 1
I just had the same problem, you gived me the idea of checking spark driver config and i solved it by setting host and port like this.
val session: SparkSession = SparkSession.builder()
.appName("Spark example")
.master("local[2]")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.port", "9999")
.config("spark.testing.memory", "2147480000")
.getOrCreate()
...
val query = consoleDataFrameWriter.start()
query.awaitTermination()
Upvotes: -1