soCalJack23
soCalJack23

Reputation: 5

Implementing Spring + Apache Flink project with Postgres

I have a SpringBoot gradle project using apache flink to process datastream signals. When a new signal comes through the datastream, I would like to query look up (i.e. findById() ) it's details using an ID in a postgres database table which is already created in order to get additional information about the signal and enrich the data. I would like to avoid using spring dependencies to perform the lookup (i.e Autowire repository) and want to stick with flink implementation for the lookup.

Where can i specify how to add the postgres connection config information such as port, database, url, username, password etc... (for simplicity purposes can assume the postgres db is local in my machine). Is it as simple as adding the configuration to the application.properties file? if so how can i write the query method to look up the record in the postgres table when searching by non primary key value?

Some online sources are suggesting using this skeleton code but I am not sure how/id it fits my use case. (I have a EventEntity model created which contains all the params/columns from the table which i'm looking up).

like so

    public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {

        // Declare DB connection & query statements

        public void open(Configuration parameters) throws Exception {
            //Initialize DB connection
            //prepare query statements
        }

        @Override
        public void flatMap(String value, Collector<EventEntity> out) throws Exception {

        }
    }

Upvotes: 0

Views: 1791

Answers (1)

misterbaykal
misterbaykal

Reputation: 542

Your sample code is correct. You can set all your custom initialization and preparation code for PostgreSQL in open() method. Then you can use your pre-configured fields in your flatMap() function.

Here is one sample for Redis operations

  • I have used RichAsyncFunction here and I suggest you do the same as it is suggested as best practice. Read here for more: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html)
  • You can pass configuration parameteres in your constructor method and use it in your initialization process

        public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {
    
            private JedisPool jedisPool;
            private Configuration redisConf;
    
            public AsyncRedisOperations(Configuration redisConf) {
              this.redisConf = redisConf;
            } 
    
            @Override
            public void open(Configuration parameters) {
    
              JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
              jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
              jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
              jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));
    
              JedisPool jedisPool = new JedisPool(jedisPoolConfig,
                this.redisConf.getString("host", "192.168.10.10"),
                this.redisConf.getInteger("port", 6379), 5000);
    
              try {
                this.jedisPool = jedisPool;
                this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
              } catch (Exception e) {
                this.logger.error(BaseUtil.append("Exception while connecting Redis"));
              }
    
            }
    
            @Override
            public void asyncInvoke(Object in, ResultFuture<Object> out) {
    
              try (Jedis jedis = this.jedisPool.getResource()) {
                String key = jedis.get(key);
                this.logger.info("Redis Key: " + key);
              } 
    
            }
        }      
    

Upvotes: 0

Related Questions