Reputation: 307
My scenario is, I want to call one stream based on another stream input. Both Stream type is different. The following is my sample code. I want to trigger one stream when some message is received from Kafka stream.
While Application start up, i can read data from DB. Then again i want to get data from DB based on some kafka message. When i receive kafka message in stream , i want to get data from DB again.This is my actual use case.
How to achieve this? Is it possible ?
public class DataStreamCassandraExample implements Serializable{
private static final long serialVersionUID = 1L;
static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);
private transient static StreamExecutionEnvironment env;
static DataStream<Tuple4<UUID,String,String,String>> inputRecords;
public static void main(String[] args) throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(argParameters);
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);
ClusterBuilder cb = new ClusterBuilder() {
private static final long serialVersionUID = 1L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1")
.withPort(9042)
.withoutJMXReporting()
.build();
}
};
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
new CassandraInputFormat<> ("select * from employee_details", cb);
//While Application is start up , Read data from table and send as stream
inputRecords = getDBData(env,cassandraInputFormat);
// If any data comes from kafka means, again i want to get data from table.
//How to i trigger getDBData() method from inside this stream.
//The below code is not working
DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
.map(new MapFunction<String,String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
inputRecords = getDBData(env,cassandraInputFormat);
return "OK";
}
});
//This is not printed , when i call getDBData() stream from inside the kafka stream.
inputRecords1.print();
DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
Employee emp = new Employee();
try{
emp.setEmpid(value.f0);
emp.setFirstname(value.f1);
emp.setLastname(value.f2);
emp.setAddress(value.f3);
}
catch(Exception e){
}
return new Tuple2<>(emp.getEmpid().toString(), emp);
}
}).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {
private static final long serialVersionUID = 1L;
@Override
public Employee map(Tuple2<String, Employee> value)
throws Exception {
return value.f1;
}
});
empDataStream.print();
env.execute();
}
private static DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){
DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
.createInput
(cassandraInputFormat
,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
return inputRecords;
}
}
Upvotes: 1
Views: 1023
Reputation: 382
this is going to be a very verbose answer.
To correctly use Flink as a developper, you need to have an understading of its basic concepts. I suggest you start by the architecture overview (https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html), it contains all you need to know in order to get into the world of Flink when you come from programming.
Now, looking at your code, it should not do what you expect because of how Flink will read it. You need to understand that Flink has at least two big steps when it executes your code: first it builds an execution graph which only describes what it needs to do. This happens at the job manager level. The second big step is to ask one or many workers to execute the graph. These two steps are sequential and anything you do regarding the graph description has to be done at the job manager level not inside your operations.
In your case, the graph has:
The line inputRecords = getDBData(env,cassandraInputFormat);
will create an orphan branch of the graph. And the line DataStream<Employee> empDataStream = inputRecords.map...
will append a branch of a map->keyBy->map
to that orphan branch. This will build a part of the graph that will read all the employee records from Cassandra and apply the map->keyBy->map
transformations. This will not be linked with the Kafka source in any way.
Now let's get back to your need. I understand you need to fetch data for an employee when his/her id comes from Kafka and do some operations.
The most clean way to handle this is called Side Inputs. This is a data input that you declare when you build your graph and the job manager handles the reading of data and its transmission to the workers. The bad news is that Side Inputs are not yet working for streaming jobs in Flink (https://issues.apache.org/jira/browse/FLINK-2491 - this bug causes streamning jobs to not create checskpoints because side inputs finish quickly and this puts the job in a bizzare state).
With this being said you still have three more options. The right option depends on the size of your employee cassandra table.
The second option is to load all employees to a static final variable employees
and use it inside your map functions. The backside of this approach is that the job manager will send a serialized copy of this variable to all workers and may congest your network and may also overload the RAM. If the size of the table is small and should not grow big in the future, then this may be an acceptable work-arround until the Side Inputs are working for streaming jobs. If the size of the table is big or should evolve in the future then consider the third option.
The third option is an improvement of the second one. It uses Flink's broadcast variables (see https://flink.apache.org/2019/06/26/broadcast-state.html and https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html). Short story: it is the same as before with better transfer management. Flink will find the best way to store and send the variable to the workers. This approach is though a litle bit more complicated to implement correctly.
The last option is not advisable in normal circumstances. It simply consists in making a call to Cassandra inside your map operation. This is not a good practice because it adds repeated latency to all your map executions (there will be as many calls as items passing through Kafka). A call means a connection creation, the actual request with the query and waiting for Cassandra to reply and freeing the connection. This can be a lot of work for a step in your graph. It is a solution to consider when you really can not find any alternatives.
For your case, I would advise the third option. I guess the employee table should not be very big and using Broadcast variables is a good choice.
Upvotes: 2