M_Gh
M_Gh

Reputation: 1142

The implementation of the AbstractCassandraTupleSink is not serializable

I create a program to count words in Wikipedia. It works without any errors. Then I created the Cassandra table with two columns "word(text) and count(bigint)". The problem is when I wanted to enter words and counts to Cassandra table.My program is in following:

    public class WordCount_in_cassandra {


    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

     DataStream<String> text=env.addSource(new WikipediaEditsSource()).map(WikipediaEditEvent::getTitle);

       DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

            CassandraSink.addSink(counts)
                    .setQuery("INSERT INTO mar1.examplewordcount(word, count) values values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();
       }

        // execute program
        env.execute("Streaming WordCount");
    }//main

  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

After running this code I got this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractCassandraTupleSink is not serializable. The object probably contains or references non serializable fields.

How I can solve the issue?

Upvotes: 0

Views: 710

Answers (1)

kkrugler
kkrugler

Reputation: 9265

I tried to replicate your problem, but I didn't get the serialization issue. Though because I don't have a Cassandra cluster running, it fails in the open() call. But this happens after serialization, as it's called when the operator being started by the TaskManager. So it feels like you have something maybe wrong with your dependencies, such that it's somehow using the wrong class for the actual Cassandra sink.

BTW, it's always helpful to include context for your error - e.g. what version of Flink, are you running this from an IDE or on a cluster, etc.

Just FYI, here are the Flink jars on my classpath...

flink-java/1.7.0/flink-java-1.7.0.jar
flink-core/1.7.0/flink-core-1.7.0.jar
flink-annotations/1.7.0/flink-annotations-1.7.0.jar
force-shading/1.7.0/force-shading-1.7.0.jar
flink-metrics-core/1.7.0/flink-metrics-core-1.7.0.jar
flink-shaded-asm/5.0.4-5.0/flink-shaded-asm-5.0.4-5.0.jar
flink-streaming-java_2.12/1.7.0/flink-streaming-java_2.12-1.7.0.jar
flink-runtime_2.12/1.7.0/flink-runtime_2.12-1.7.0.jar
flink-queryable-state-client-java_2.12/1.7.0/flink-queryable-state-client-java_2.12-1.7.0.jar
flink-shaded-netty/4.1.24.Final-5.0/flink-shaded-netty-4.1.24.Final-5.0.jar
flink-shaded-guava/18.0-5.0/flink-shaded-guava-18.0-5.0.jar
flink-hadoop-fs/1.7.0/flink-hadoop-fs-1.7.0.jar
flink-shaded-jackson/2.7.9-5.0/flink-shaded-jackson-2.7.9-5.0.jar
flink-clients_2.12/1.7.0/flink-clients_2.12-1.7.0.jar
flink-optimizer_2.12/1.7.0/flink-optimizer_2.12-1.7.0.jar
flink-streaming-scala_2.12/1.7.0/flink-streaming-scala_2.12-1.7.0.jar
flink-scala_2.12/1.7.0/flink-scala_2.12-1.7.0.jar
flink-shaded-asm-6/6.2.1-5.0/flink-shaded-asm-6-6.2.1-5.0.jar
flink-test-utils_2.12/1.7.0/flink-test-utils_2.12-1.7.0.jar
flink-test-utils-junit/1.7.0/flink-test-utils-junit-1.7.0.jar
flink-runtime_2.12/1.7.0/flink-runtime_2.12-1.7.0-tests.jar
flink-queryable-state-runtime_2.12/1.7.0/flink-queryable-state-runtime_2.12-1.7.0.jar
flink-connector-cassandra_2.12/1.7.0/flink-connector-cassandra_2.12-1.7.0.jar
flink-connector-wikiedits_2.12/1.7.0/flink-connector-wikiedits_2.12-1.7.0.jar

Upvotes: 1

Related Questions