Reputation: 47
I'm trying to use Kafka connect to write data using the standalone mode. The topic that I'm writing the data to, is having multiple partitions. However, the data is being written to only one of the partitions. When I start multiple consumer consoles, the data is printed to only one of them. The other consumer console get any data only after the 1st one is closed. I am not able to figure out what change do I need to make in the configuration file to make it write to multiple partitions.
Here is the standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084
connect-file-source.properties:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group
Now I'm using the following command to run the connector:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
Using the following to start consumer consoles:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group
It keeps printing data to one of the consumer consoles only. However, if I use a producer console instead of Kafka connect to write messages, then I can see the messages on multiple consumers (in a round robin fashion), the way it should be. But using Kafka connect, it is only writing all the data to single partition and other consumers in the same group have to sit idle. What needs to be changed to make it write to all partitions in round robin system?
Upvotes: 2
Views: 2762
Reputation: 7197
This answer applies to Apache Kafka 0.10.2.1, but may not necessarily apply to future versions.
As you may know, the file source connector generates messages with a null
key and null
topic partition number. That means it is up to Kafka Connect's producer to assign a topic partition using it's partitioner, and for messages with a null key the default partitioner will attempt to round-robin the messages to the available partitions.
However, you're running into one of the quirks of the JSON converter, which is configured in the standalone.properties
file via the key.converter
and value.converter
properties:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
When the JSON Converter is configured to enable schemas, then the JSON representation includes an envelope around the value so that the key or value contain both the schema and payload:
{
"schema": ...,
"payload": ...
}
Your standalone.properties
file configures the key's converter with schemas enabled, so even though the connector generates messages with null
keys and null
schemas, the JSON converter (with schemas enabled) always wraps these in an envelope. Thus, every message's key will be:
{
"schema": null,
"payload": null
}
The producer's default partitioner will always hash these identical keys to the same partition.
To change the behavior, edit your standalone.properties
files and change the key.converter.schemas.enable
property to false
:
key.converter.schemas.enable=false
You can optionally change the value.converter.schemas.enable
property to false
to change how the value is written to not wrap the value in the envelope and include the schema:
value.converter.schemas.enable=false
This also plays into how the converters deal with null values, which some connectors generate when the source entity with a particular key is removed. For example, some change data capture connectors do this when a row is deleted from the source database. This works great with log compacted topics, since each message represents the last known state of the keyed entity, and because a null value corresponds to a tombstone record telling Kafka that all messages with the same key prior to that tombstone can all be removed from the log. But, if configuring the value converter to be a JSON Converter with schemas enabled will never output a null
message value, so log compaction never removes the tombstone message. It's a minor issue, but one to be aware of.
If you want to encode your keys and values in JSON, then chances are you won't need or want the schemas and can thus turn of the schemas.enable
for both they key and value JSON converters.
For those really using schemas, consider using Confluent's Schema Registry and the Avro Converters. Not only are the encoded messages significantly smaller (due to the Avro encoding rather than JSON string encoding), the encoded messages include the ID of the Avro schema and thus allow you to evolve your message schemas over time without having to coordinate upgrading your producers and consumers to use the exact same schemas. There are all kinds of advantages!
Upvotes: 7