Reputation: 1171
I am using spark 2.3.2 Version.
I have written code in spark structured streaming to insert streaming dataframes data into two different MySQL tables.
Let say there are two streaming df's: DF1, DF2.
I have written two queries(query1,query2) using foreachWriter API to write into MySQL tables from different streamings respectively. I.E. DF1 into MYSQLtable A and DF2 into MYSQL table B.
When I run the spark job, first it runs query1 and then query2, so it's writing to table A but not into table B.
If I change my code to run query2 first and then query1, its writing into table B but not into table A.
So I understand that it's executing the first coming query only to write into the table.
Note: I have tried giving different MySQL user/database to two tables respectively. But no luck.
Can anyone please advise? How to make it work.
My code is below:
import java.sql._
class JDBCSink1(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
val driver = "com.mysql.jdbc.Driver"
var connection:Connection = _
var statement:Statement = _
def open(partitionId: Long,version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: (org.apache.spark.sql.Row)): Unit = {
val insertSql = """ INSERT INTO tableA(col1,col2,col3) VALUES(?,?,?); """
val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
preparedStmt.setString (1, value(0).toString)
preparedStmt.setString (2, value(1).toString)
preparedStmt.setString (3, value(2).toString)
preparedStmt.execute
}
def close(errorOrNull: Throwable): Unit = {
connection.close
}
}
class JDBCSink2(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
val driver = "com.mysql.jdbc.Driver"
var connection:Connection = _
var statement:Statement = _
def open(partitionId: Long,version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: (org.apache.spark.sql.Row)): Unit = {
val insertSql = """ INSERT INTO tableB(col1,col2) VALUES(?,?); """
val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
preparedStmt.setString (1, value(0).toString)
preparedStmt.setString (2, value(1).toString)
preparedStmt.execute
}
def close(errorOrNull: Throwable): Unit = {
connection.close
}
}
val url1="jdbc:mysql://hostname:3306/db1"
val url2="jdbc:mysql://hostname:3306/db2"
val user1 ="usr1"
val user2="usr2"
val pwd = "password"
val Writer1 = new JDBCSink1(url1,user1, pwd)
val Writer2 = new JDBCSink2(url2,user2, pwd)
val query2 =
streamDF2
.writeStream
.foreach(Writer2)
.outputMode("append")
.trigger(ProcessingTime("35 seconds"))
.start().awaitTermination()
val query1 =
streamDF1
.writeStream
.foreach(Writer1)
.outputMode("append")
.trigger(ProcessingTime("30 seconds"))
.start().awaitTermination()
Upvotes: 2
Views: 1363
Reputation: 18525
You are blocking the second query because of the awaitTermination
. If you want to have two output streams you need to start both before waiting for their termination:
val query2 =
streamDF2
.writeStream
.foreach(Writer2)
.outputMode("append")
.trigger(ProcessingTime("35 seconds"))
.start()
val query1 =
streamDF1
.writeStream
.foreach(Writer1)
.outputMode("append")
.trigger(ProcessingTime("30 seconds"))
.start()
query1.awaitTermination()
query2.awaitTermination()
Edit:
Spark also allows you to schedule and allocate resources to the different streaming queries as described in Scheduling within an application. You can configure your pools based on
FIFO
or FAIR
The pool configurations can be set by creating an XML file, similar to conf/fairscheduler.xml.template
, and either putting a file named fairscheduler.xml on the classpath, or setting spark.scheduler.allocation.file
property in your SparkConf.
conf.set("spark.scheduler.allocation.file", "/path/to/file")
Applying different pool can be done like below:
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
// In the above example you could then tell Spark to make use of the pools
val query1 = streamDF1.writeStream.[...].start(pool1)
val query2 = streamDF2.writeStream.[...].start(pool2)
Upvotes: 3