spoon
spoon

Reputation: 41

How to use Jedis in flink map()

My code like this:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.enableCheckpointing(500);

  DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));

  Jedis jedis = new Jedis("master1");
  stream.map(new RichMapFunction<String, String>() {
      @Override
      public String map(String value) throws Exception {
          String result = jedis.hget("rtc", value);
          return result;
      }
  });

I want to get some data from Redis in map(), but it cannot run,because Jedis.class is not serializable.

How to use not serializable class in map(),such as ZkClient,Jedis?

Upvotes: 0

Views: 1094

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

All rich functions like the RichMapFunction have an open(Configuration) and close call which you can override. These lifecycle methods are called once the function has been deployed to a TaskManager where it is executed.

class MyMapFunction extends RichMapFunction<String, String> {

    private transient Jedis jedis;

    @Override
    public void open(Configuration parameters) {
        // open connection to Redis, for example
        jedis = new Jedis("master1");
    }

    @Override
    public void close() {
        // close connection to Redis
        jedis.close();
    }
}

Upvotes: 3

Related Questions