Reputation: 457
I am trying to map a CSV file, already consumed by Flink and produced by Kafka, into a Tuple4. My CSV file has 4 columns and I want to map each row into a Tuple4. The problem is that I do not know how to implement the map() and the csv2Tuple functions.
Here is where I am stuck:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);
DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}
I would like also parse from String to Integer the items in the tuple.
Upvotes: 0
Views: 1151
Reputation: 3427
Suppose you produce every row of csv
file as Kafka message and consume it using Flink Kafka connector, You just should split every consumed message with the ,
(because it is a csv
file).
DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
String[] temp = str.split(",");
return new Tuple4<>(
Integer.parseInt(temp[0]),
Integer.parseInt(temp[1]),
Integer.parseInt(temp[2]),
Integer.parseInt(temp[3])
);
}
});
Upvotes: 1