Prakhar Rastogi
Prakhar Rastogi

Reputation: 41

No subscriptions have been created error in Reactor Kafka

Consumer Config file: Here I am using StringDeserializers for both key and values. And the subscription has been made on a single topic.

@Bean("errorReceiver")
public ReceiverOptions<Object, String> errorConsumerConfig() {
    Map<String, Object> errorConsumerProps = new HashMap<>();
    errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
    errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
    errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
    errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
    errorReceiverOptions.subscription(Collections.singleton("order_topic"))
    .addAssignListener(partitions -> log.info("onPartitionsAssigned : {}", partitions))
    .addRevokeListener(partitions -> log.info("onPartitionsRevoked : {}", partitions));
    return errorReceiverOptions;
}
}

Consumer Code: My log in consumer code is printing subscribed topic as null. The AppUtility is transforming data to string.

@Autowired
@Qualifier("errorReceiver")
private ReceiverOptions<Object, String> errorReceiverOptions;

@EventListener(ApplicationStartedEvent.class)
public Disposable getErrorsTopic() {
    Flux<ReceiverRecord<Object, Object>> kafkaFlux = KafkaReceiver.create(errorReceiverOptions).receive();
    log.info("subs topics : {}", errorReceiverOptions.subscriptionTopics());
    return kafkaFlux.log()
            .doOnNext(AppUtility::toBinary)
            .doOnError(error -> log.error("error ocurred", error))
            .subscribe();
}

Logs:

java.lang.IllegalStateException: No subscriptions have been created
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:385) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:187) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.6.jar:3.4.6]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.6.jar:3.4.6]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

pom imports:

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
</dependency>

Has anyone faced such issue? I am unable to resolve this issue.

Upvotes: 1

Views: 1487

Answers (2)

Prakhar Rastogi
Prakhar Rastogi

Reputation: 41

I forgot the prime concept of immutability in reactive programming. Solved this by assigning the options to another options object (4th last line in config code).

public ReceiverOptions<Object, String> errorConsumerConfig() {
Map<String, Object> errorConsumerProps = new HashMap<>();
errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
ReceiverOptions<Object, String> options = errorReceiverOptions.subscription(Collections.singleton("order_topic")) // setting the subscription doesn't work unless assigned to an object, reason being immutability
.addAssignListener(partitions -> log.debug("onPartitionsAssigned : {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked : {}", partitions));
return options; }

Edit: Try setting subscription topic in your listener code.

Upvotes: 3

Ran Lupovich
Ran Lupovich

Reputation: 1821

Hi please have a look on the below examples, According to example you are missing something like the code below

        return kafkaFlux.subscribe(record -> {
        ReceiverOffset offset = record.receiverOffset();
        System.out.printf("Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",
                offset.topicPartition(),
                offset.offset(),
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                record.value());
        offset.acknowledge();
        latch.countDown();
    });

https://github.com/reactor/reactor-kafka/blob/main/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleConsumer.java

Upvotes: 0

Related Questions