Thomas
Thomas

Reputation: 31

Apache Flink transform DataStream (source) to a List?

My question is how to transform from a DataStream to a List, for example in order to be able to iterate through it.

The code looks like :

package flinkoracle;

//imports
//....

public class FlinkOracle {

    final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);

    public static void main(String[] args) {
        LOG.info("Starting...");
        // get the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        //get the source from Oracle DB
        DataStream<?> source = env
                .createInput(JDBCInputFormat.buildJDBCInputFormat()
                        .setDrivername("oracle.jdbc.driver.OracleDriver")
                        .setDBUrl("jdbc:oracle:thin:@localhost:1521")
                        .setUsername("user")
                        .setPassword("password")
                        .setQuery("select * from  table1")
                        .setRowTypeInfo(rowTypeInfo)
                        .finish());

        source.print().setParallelism(1);

        try {
            LOG.info("----------BEGIN----------");
            env.execute();
            LOG.info("----------END----------");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("End...");
    }

}

Thanks a lot in advance. Br Tamas

Upvotes: 3

Views: 5166

Answers (2)

Jake
Jake

Reputation: 673

In newer versions, DataStreamUtils::collect has been deprecated. Instead you can use DataStream::executeAndCollect which, if given a limit, will return a List of at most that size.

var list = source.executeAndCollect(100);

If you do not know how many elements there are, or if you simply want to iterate through the results without loading them all into memory at once, then you can use the no-arg version to get a CloseableIterator

try (var iterator = source.executeAndCollect()) {
  // do something
}

Upvotes: 0

Nizar
Nizar

Reputation: 436

Flink provides an iterator sink to collect DataStream results for testing and debugging purposes. It can be used as follows:

import org.apache.flink.contrib.streaming.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

You can copy an iterator to a new list like this:

while (iter.hasNext())
    list.add(iter.next());

Flink also provides a bunch of simple write*() methods on DataStream that are mainly intended for debugging purposes. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements sent to the OutputFormat are immediately shown up in the target system. Note: These write*() methods do not participate in Flink’s checkpointing, and in failure cases, those records might be lost.

writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket

Source: link.

You may need to add the following dependency to use DataStreamUtils:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>

Upvotes: 1

Related Questions