Reputation: 106
I am following https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/jdbc.html to use a mysql database as sink for Flink. The code compiles successfully but executing the job in a Flink cluster fails with
The program finished with the following exception:
The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1899)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1296)
org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1131)
Aggregator.Aggregator$.main(Aggregator.scala:81)
Here is the relevant part of the code:
object Aggregator {
@throws[Exception]
def main(args: Array[String]): Unit = {
[...]
val counts = stream.map { x => (
x.get("value").get("id").asInt(),
x.get("value").get("kpi").asDouble()
)}
.keyBy(0)
.timeWindow(Time.seconds(60))
.sum(1)
counts.print()
val statementBuilder: JdbcStatementBuilder[(Int, Double)] = (ps: PreparedStatement, t: (Int, Double)) => {
ps.setInt(1, t._1);
ps.setDouble(2, t._2);
};
val connection = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("mysql.Driver")
.withPassword("XXX")
.withUrl("jdbc:mysql://<DB_HOST>:3306/<DB_NAME>")
.withUsername("<USERNAME>")
.build();
val jdbcSink = JdbcSink.sink(
"INSERT INTO table (id, kpi) VALUES (?, ?)",
statementBuilder,
connection);
counts.addSink(jdbcSink)
env.execute("Aggregator")
}
}
I am not sure which part of the code is the problem here and how to debug. Unfortunately I also cannot find a reference implementation for a JDBC sink in Scala. Any help is appreciated!
Upvotes: 2
Views: 1579
Reputation: 596
What worked for me is explicitly creating JdbcStatementBuilder. Something like:
val statementBuilder: JdbcStatementBuilder[(Int, Double)] =
new JdbcStatementBuilder[(Int, Double)] {
override def accept(ps: PreparedStatement, t: (Int, Double)): Unit = {
ps.setInt(1, t._1)
ps.setDouble(2, t._2)
}
}
Upvotes: 2