Abhijith Madhav
Abhijith Madhav

Reputation: 2818

How to lookup db data from a flink job

I am new to Apache Flink.

I am constructing a flink operator. There is a need to fetch data from a relational store to process the streaming data. This is a small quick lookup. I have used a spring-jdbc client to do lookups.

public class FilterCriteriaEvaluator extends KeyedProcessFunction<Long, DeviceAttrUpdate, FilterCriteriaEval> {


private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

public FilterCriteriaEvaluator(DataSource dataSource) {
    namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
...

However on executing this job on a cluster I get the following error

Caused by: java.io.NotSerializableException: org.springframework.jdbc.core.JdbcTemplate

The spring db client is not serializable. I then considered using java.sql.DataSource directly. But this is not serializable as well.

Marking the db client transient doesn't help as it is then not serialized as a part of the operator object and I get an NPE while executing the job on the cluster.

What am I missing here? How can I do db lookups from a flink job?

Upvotes: 0

Views: 1096

Answers (2)

David Anderson
David Anderson

Reputation: 43454

The mechanism that's built into Flink to make what you're doing easy is to do a lookup join using Flink SQL.

That would look something like this:

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

For more about the JDBC SQL connector, see the docs.

Upvotes: 1

Mikalai Lushchytski
Mikalai Lushchytski

Reputation: 1641

As a workaround, you can mark the JdbcTemplate as transient and make it lazy initializable - something like


private transient JdbcTemplate instance = null;

// Not thread-safe
public JdbcTemplate getInstance() {
    if(instance == null){
        // init
    }

    return instance;
}

and access it via getInstance() method. However, this way you will have a separate instance per each task slot.

In order to have a single instance per Task Manager, you can make it static variable. However, this way you will need to care about thread safety and make thread-safe initialiser.

Alternatively, you should again mark the variable transient, but instead of making getInstance field, extend from RichFunction (or whatever operation you have) and initialise the template in open(Configuration parameters) method, which gets called on initialisation and is suitable for one time setup work.

Upvotes: 1

Related Questions