Salvador Vigo
Salvador Vigo

Reputation: 457

Flink map stream csv file into Tuple

I am trying to map a CSV file, already consumed by Flink and produced by Kafka, into a Tuple4. My CSV file has 4 columns and I want to map each row into a Tuple4. The problem is that I do not know how to implement the map() and the csv2Tuple functions.

Here is where I am stuck:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

I would like also parse from String to Integer the items in the tuple.

Upvotes: 0

Views: 1151

Answers (1)

Soheil Pourbafrani
Soheil Pourbafrani

Reputation: 3427

Suppose you produce every row of csv file as Kafka message and consume it using Flink Kafka connector, You just should split every consumed message with the , (because it is a csv file).

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });

Upvotes: 1

Related Questions