Chandan Mishra
Chandan Mishra

Reputation: 73

Maybe not public or not valid? Using Spring's Websocket and Kafka

As I am trying to consume data from a topic (the topic name is based on user) and during runtime I am trying to consume message from the topic but I am getting the following error.

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'consumerProperties' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext' - maybe not public or not valid?

Here is my code

@Service
public class kafkaConsumerService {


    private SimpMessagingTemplate template;

     KafkaConsumerProperties consumerProperties;

     @Autowired
    public kafkaConsumerService(KafkaConsumerProperties consumerProperties, SimpMessagingTemplate template) {
         this.consumerProperties=consumerProperties;
         this.template=template;
    }

    @KafkaListener(topics = {"#{consumerProperties.getTopic()}"})
    // @KafkaListener(topics="Chandan3706")
    public void consume(@Payload Message message) {
        System.out.println("from kafka topic::" + message);
        template.convertAndSend("/chat/getMessage", message);
    }

}

My KafkaConsumerProperties.class

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootStrap;
    private String group;
    private String topic;

    public String getBootStrap() {
        return bootStrap;
    }

    public void setBootStrap(String bootStrap) {
        this.bootStrap = bootStrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;

    }

    @Override
    public String toString() {
        return "KafkaConsumerProperties [bootStrap=" + bootStrap + ", group=" + group + ", topic=" + topic + "]";
    }
}

Thanks in advance

Upvotes: 2

Views: 5063

Answers (2)

Tzofiya Tzadok
Tzofiya Tzadok

Reputation: 1

The following code worked for me, notice to @DependsOn("KafkaConsumerProperties") and @Component("KafkaConsumerProperties") annotations.

KafkaConsumerService class:

@Service
@DependsOn("KafkaConsumerProperties")
public class KafkaConsumerService {

   @KafkaListener(topics = "#{@KafkaConsumerProperties.getTopic()}")
   public void consume(@Payload Message message) {
       System.out.println("from kafka topic::" + message);
       template.convertAndSend("/chat/getMessage", message); 
   }
}

KafkaConsumerProperties class:

@Component("KafkaConsumerProperties")
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

   private String topic;

   public String getTopic() {
       return topic;
   }
}

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121282

Since you don’t provide any bean name for your KafkaConsumerProperties component, the default one is de-capitalized class name. That’s one.

The expression you use in the @KafkaListener is regular bean definition phase expression, therefore a root object is some BeanExpressionContext , but not your listener bean as you try to get access through the property.

Not sure if you need that KafkaConsumerProperties property in this listener, but expression must ask for the kafkaConsumerProperties bean:

@Service
public class kafkaConsumerService {


    private SimpMessagingTemplate template;

     @Autowired
    public kafkaConsumerService(SimpMessagingTemplate template) {
         this.template=template;
    }

    @KafkaListener(topics = {"#{kafkaConsumerProperties.topic}"})
    // @KafkaListener(topics="Chandan3706")
    public void consume(@Payload Message message) {
        System.out.println("from kafka topic::" + message);
        template.convertAndSend("/chat/getMessage", message);
    }

}

Upvotes: 2

Related Questions