Reputation: 41
My code like this:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));
Jedis jedis = new Jedis("master1");
stream.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String result = jedis.hget("rtc", value);
return result;
}
});
I want to get some data from Redis in map()
, but it cannot run,because Jedis.class is not serializable.
How to use not serializable class in map()
,such as ZkClient,Jedis?
Upvotes: 0
Views: 1094
Reputation: 13346
All rich functions like the RichMapFunction
have an open(Configuration)
and close
call which you can override. These lifecycle methods are called once the function has been deployed to a TaskManager where it is executed.
class MyMapFunction extends RichMapFunction<String, String> {
private transient Jedis jedis;
@Override
public void open(Configuration parameters) {
// open connection to Redis, for example
jedis = new Jedis("master1");
}
@Override
public void close() {
// close connection to Redis
jedis.close();
}
}
Upvotes: 3