Reputation: 2816
I was following up with an example at to connect Cassandra as sink in Flink
My code for is shown below
public class writeToCassandra {
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;
private final static Collection<String> collection = new ArrayList<>(50);
static {
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
public static void main(String[] args) throws Exception {
//setting the env variable to local
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<Tuple2<String, String>> dataStream = envrionment
.map(new MapFunction<String, Tuple2<String, String>>() {
final String mapped = " mapped ";
String[] splitted;
public Tuple2<String, String> map(String s) throws Exception {
splitted = s.split("\\s+");
return Tuple2.of(
splitted[0] + mapped + splitted[1]
.setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
} //main
} //writeToCassandra
I am getting the following error
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: / (com.datastax.driver.core.exceptions.TransportException: [/] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(
Upvotes: 1
Views: 895
Reputation: 217
The exception simply indicates that the example program cannot reach the C* database.
Upvotes: 0
Reputation: 1114
Not sure if this is always required, but the way that I set up my CassandraSink is like this:
.setClusterBuilder(new ClusterBuilder() {
protected Cluster buildCluster(Cluster.Builder builder) {
return Cluster.builder()
I have annotated POJOs that are returned by the dataStream so I don't need the query, but you would just include ".setQuery(...)" after the ".addSink(...)" line.
Upvotes: 1