Reputation: 2818
I am new to Apache Flink.
I am constructing a flink operator. There is a need to fetch data from a relational store to process the streaming data. This is a small quick lookup. I have used a spring-jdbc client to do lookups.
public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
public FilterCriteriaEvaluator(DataSource dataSource) {
namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...
However on executing this job on a cluster I get the following error
Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate
The spring db client is not serializable. I then considered using java.sql.DataSource
directly. But this is not serializable as well.
Marking the db client transient doesn't help as it is then not serialized as a part of the operator object and I get an NPE
while executing the job on the cluster.
What am I missing here? How can I do db lookups from a flink job?
Upvotes: 0
Views: 1096
Reputation: 43454
The mechanism that's built into Flink to make what you're doing easy is to do a lookup join using Flink SQL.
That would look something like this:
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
For more about the JDBC SQL connector, see the docs.
Upvotes: 1
Reputation: 1641
As a workaround, you can mark the JdbcTemplate
as transient and make it lazy initializable - something like
private transient JdbcTemplate instance = null;
// Not thread-safe
public JdbcTemplate getInstance() {
if(instance == null){
// init
}
return instance;
}
and access it via getInstance()
method.
However, this way you will have a separate instance per each task slot.
In order to have a single instance per Task Manager, you can make it static variable. However, this way you will need to care about thread safety and make thread-safe initialiser.
Alternatively, you should again mark the variable transient
, but instead of making getInstance
field, extend from RichFunction
(or whatever operation you have) and initialise the template in open(Configuration parameters)
method, which gets called on initialisation and is suitable for one time setup work.
Upvotes: 1