xksa
xksa

Reputation: 127

My FlinkKafkaProducer011 needs fewer arguments than is supposed to

I’m having difficulties with FlinkKafkaConnector.

Although I have imported the org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011, I got the following error.

Too many arguments for constructor FlinkKafkaProducer011

According to this site, the arguments are correct. However, my IntelliJ shows this definition.

public FlinkKafkaProducer011() {
}

Does anyone know how to fix it?

The error I got

UPDATE

Here is the IDE definition.

public class FlinkKafkaProducer011 {
    public FlinkKafkaProducer011() {
    }

    public static class NextTransactionalIdHint extends org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.NextTransactionalIdHint {
        public NextTransactionalIdHint() {
        }
    }

    public static class TransactionStateSerializer {
        public TransactionStateSerializer() {
        }

        public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
            public TransactionStateSerializerSnapshot() {
                super(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.TransactionStateSerializer::new);
            }
        }
    }

    public static class ContextStateSerializer {
        public ContextStateSerializer() {
        }

        public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
            public ContextStateSerializerSnapshot() {
                super(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.ContextStateSerializer::new);
            }
        }
    }

    public static class NextTransactionalIdHintSerializer {
        public NextTransactionalIdHintSerializer() {
        }

        public static final class NextTransactionalIdHintSerializerSnapshot extends SimpleTypeSerializerSnapshot<org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.NextTransactionalIdHint> {
            public NextTransactionalIdHintSerializerSnapshot() {
                super(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.NextTransactionalIdHintSerializer::new);
            }
        }
    }
}

Upvotes: 1

Views: 540

Answers (1)

Mikalai Lushchytski
Mikalai Lushchytski

Reputation: 1641

Could you please specify what flink-connector-kafka version you use ?

From what I can see, the org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 you have used is just a compatibility class from flink-connector-kafka_2.11 jar and the right producer to use is org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.

As far as I understand, the javadoc https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.html is relevant for org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 producer from flink-connector-kafka-0.11_2.11 artifact.

So, I would suggest to either switch to org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer or change dependency to flink-connector-kafka-0.11_2.11 jar.

Upvotes: 2

Related Questions