Reputation: 41
I would like to periodically call KafkaMessageListenerContainer.getAssignedPartitions()
from an arbitrary thread.
The method is public and thus expected to be safe.
In my case getAssignedPartitions
operates on partitionsListenerConsumer.assignedPartitions
field of the ListenerConsumer instance (i'm using consumer.subscribe).
This field is a plain LinkedHashSet
and can be modified by ListenerConsumerRebalanceListener
and at the same time it can be copied and read by an arbitrary thread.
It looks like that getAssignedPartitions
can't be safely used in my case. But why then it is public?
Am I missing something? Or it is a bug?
Upvotes: 0
Views: 52
Reputation: 121552
I'm not sure why you find it as unsafe since that method returns an unmodifiable copy:
public Collection<TopicPartition> getAssignedPartitions() {
ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
if (partitionsListenerConsumer != null) {
if (partitionsListenerConsumer.definedPartitions != null) {
return Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet());
}
else if (partitionsListenerConsumer.assignedPartitions != null) {
return Collections.unmodifiableCollection(partitionsListenerConsumer.assignedPartitions);
}
}
return null;
}
UPDATE
Now I see after your further explanation and what is there in LinkedHashSet
JavaDocs:
Note that this implementation is not synchronized. If multiple threads access a linked hash set concurrently, and at least one of the threads modifies the set, it must be synchronized externally. This is typically accomplished by synchronizing on some object that naturally encapsulates the set. If no such object exists, the set should be "wrapped" using the Collections. synchronizedSet method.
Then looking into the mentioned Collections.unmodifiableCollection()
I agree with you that iterating over such a collection may lead to the ConcurrentModificationException
.
Please, raise a GitHub issue is spring-kafka
project, so this property can be guarded with a Collections. synchronizedSet()
as it is recommended from the mentioned JavaDocs.
Upvotes: 0