Logan
Logan

Reputation: 11

How to manually commit offsets on Kafka source module on getting acknowledgment from Kafka sink module in Spring XD?

In the XD stream, messages are consumed from a Kafka topic through a source module, and then sent to a sink Kafka module. The reason behind developing the custom source and sink Kafka modules is that I want to update the offsets from source module only when I get acknowledgement from the sink module downstream, on successfully sent messages.

I am using Spring Integration Kafka 2.0.1.RELEASE and Spring Kafka 1.0.3.RELEASE with topics in Kafka 0.10.0.0 environment. I have tried the following:

Source Module Configuration:

@Configuration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public SubscribableChannel output() {
        DirectChannel output = new DirectChannel();
        return output;
    }

    @Autowired
    TopicPartitionInitialOffset topicPartition;

    @Bean
    public TopicPartitionInitialOffset topicPartition(){
        return new TopicPartitionInitialOffset(this.topic, 0, (long) 0);    
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        ContainerProperties containerProps = new ContainerProperties(topicPartition);
        containerProps.setAckMode(AckMode.MANUAL);
        KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(),containerProps);
        return kafkaMessageListenerContainer;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        DefaultKafkaConsumerFactory<String,String> consumerFactory =  new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory;
    }
}

Source Module: InboundKafkaMessageDrivenAdapter

@MessageEndpoint
@Import(ModuleConfiguration.class)
public class InboundKafkaMessageDrivenAdapter {

    @Autowired
    KafkaMessageListenerContainer<String, String> container;

    @Autowired
    SubscribableChannel output;

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(output);
        return kafkaMessageDrivenChannelAdapter;
    }
}

Sink Module: Configuration

@Configuration
@EnableIntegration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public KafkaProducerMessageHandler<String,String> handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(this.topic));
        return handler;
    }

    @Bean
    public SubscribableChannel input() {
        return new DirectChannel();
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

Sink Module: SinkActivator

@Import(ModuleConfiguration.class)
@MessageEndpoint
public class SinkActivator {

    @Autowired
    KafkaProducerMessageHandler<String,String> handler;

    @Autowired
    SubscribableChannel input;

    @ServiceActivator(inputChannel = "input")
    public void sendMessage(Message<?> msg) throws Exception{
            Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            handler.handleMessage(msg);
            acknowledgment.acknowledge();
            }
}

The source is successful in receiving the messages and sending them to the sink, however when I try to get the Acknowledgment in the sink:

Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

The following exception is thrown:

Caused by: java.lang.IllegalArgumentException: Incorrect type specified for header 'kafka_acknowledgment'. Expected [interface org.springframework.kafka.support.Acknowledgment] but actual type is [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment]

In the source code for spring-integration-kafka-2.0.1.RELEASE the class KafkaMessageListenerContainer when AckMode=MANUAL a kafka_acknowledgment header is added to the message, however the type is an inner static class of ConsumerAcknowldgment.

So how do I get the Acknowledgment from the sink module on the message sent from the source?

Upvotes: 1

Views: 1465

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

Unless you are using the local transport, you can't do that, the Acknowledgment is a "live" object and can't be sent over the wire to another module.

If you are using the local transport it will work, but you will have class loader problems because each module runs in its own class loader and the Acknowledgment interfaces are difference instances of the class.

You would have to move spring-integration-kafka and spring-kafka to the xd/lib folder so the classes are loaded from a common classloader.

Upvotes: 0

Related Questions