Soheil Pourbafrani
Soheil Pourbafrani

Reputation: 3427

How to load data from Cassandra to Apache Flink DataStream

Trying to get data from Cassandra using Apache Flink, referencing this post I can read data, but I don't how to load it into a DataStream object. The following is the code:

ClusterBuilder cb = new ClusterBuilder() {
            @Override
            public Cluster buildCluster(Cluster.Builder builder) {
                return builder.addContactPoint("localhost")
                        /*.withCredentials("hduser".trim(), "hadoop".trim())*/
                        .build();
            }
        };
CassandraInputFormat<Tuple2<UUID, String>> cassandraInputFormat = new CassandraInputFormat<Tuple2<UUID, String>>(query, cb);

cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);

Tuple2<UUID, String> testOutputTuple = new Tuple2<>();
ByteArrayOutputStream res = new ByteArrayOutputStream();
res.reset();

while (!cassandraInputFormat.reachedEnd()) {
    cassandraInputFormat.nextRecord(testOutputTuple);
    res.write((testOutputTuple.f0.toString() + "," + testOutputTuple.f1).getBytes());
}
DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

I tried

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

to load data in res variable into DataStream<byte[]> object, but it's not a correct way. How can I do that? and is my approach of reading cassandra suitable for stream processing?

Upvotes: 1

Views: 648

Answers (2)

Mgreg
Mgreg

Reputation: 101

reading data from DB - is a finite task. You should use a DataSet API, not a DataStream when using CassandraInputFormat. for example:

DataSet<Tuple2<Long, Date>> ds = env.createInput(executeQuery(YOUR_QUERY), TupleTypeInfo.of(new TypeHint<Tuple2<Long, Date>>() {}));

private static CassandraInputFormat<Tuple2<Long, Date>> executeQuery(String YOUR_QUERY) throws IOException {
    return new CassandraInputFormat<>(YOUR_QUERY, new ClusterBuilder() {
        private static final long serialVersionUID = 1;
            @Override
            protected Cluster buildCluster(com.datastax.driver.core.Cluster.Builder builder) {
                return builder.addContactPoints(CASSANDRA_HOST).build();
            }
        });
    }
}

Upvotes: 1

user1549692
user1549692

Reputation: 93

Creating a DataStream in Flink always starts with the ExecutionEnvironment.

Instead of:

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray())); 

Try:

DataStream<Tuple2<UUID, String>> raw = ExecutionEnvironment.createInput(cassandraInputFormat);

You can then use a map function to change the datatype to DataStream


I have not used the Cassandra connector itself so I don't know if you use that part correctly.

Upvotes: 0

Related Questions