user26270
user26270

Reputation: 7074

why can't spring find the @Source bean channel created by spring cloud stream?

I'm trying to use Spring Cloud Stream to publish and consume Kafka messages. I've been working off of the documentation here on Accessing Bound Channels. I'm trying to use a custom name on the channel for my topic, so I have a @Qualifier when I'm trying to inject it, but spring can't find the relevant bean. It says "For each bound interface, Spring Cloud Stream will generate a bean that implements the interface", but the auto-wiring isn't working.

The error I'm getting is "Parameter 0 of constructor in com...MessagingManager required a bean of type 'org.springframework.messaging.MessageChannel' that could not be found."

I tried using @Autowired before the MessagingManager constructor like in the example, but then got a similar error in bean factory about there being 2 of them, so I took it out, and got the current error.

It's probably complicated by my trying to use a Processor.

Here are my components. I'm running it with spring boot and trying to test it with this :

@Component
public class StartupTester implements ApplicationListener<ContextRefreshedEvent> {
    MessagingManager messagingManager;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        messagingManager.sendThingCreatedMessage(new ThingCreated("12345", "667788"));
    }
}

@Component
public class MessagingManager {

    private MessageChannel thingCreatedChannel;

    public MessagingManager(@Qualifier(ThingChannelProcessor.THING_CREATED) MessageChannel output) {
        thingCreatedChannel = output;
    }

    public void sendThingCreatedMessage(ThingCreated thingCreated) {  
thingCreatedChannel.send(MessageBuilder.withPayload(thingCreated).build());
    }
}


@Component
    public interface ThingsChannelProcessor extends Processor {

    String THING_REQUEST = "thing-request";
    String THING_CREATED = "thing-created";

    @Input(THING_REQUEST )
    SubscribableChannel thingsRequest();

    @Output(THING_CREATED )
    MessageChannel thingCreated();
}

And I also have @EnableBinding(ThingsMessagingManager.class) on my main class which is annotated with @SpringBootApplication.

Upvotes: 3

Views: 1971

Answers (1)

Vinicius Carvalho
Vinicius Carvalho

Reputation: 4146

I could not reproduce your error. But I have a few points you could follow:

  1. You don't need to annotate the interface with @Component
  2. It seems that you have a typo on your @EnableBinding you should have @EnableBinding(ThingsChannelProcessor.class) not ThingsMessagingManager
  3. You don't need to extend Processor either, that may be the reason why you got 2 beans in the first time. If you are customizing your channels, you don't need to descend from Sink/Source/Processor, look at the Barista example in the docs
  4. Listen for an contextRefresh won't work either, as we do the binding after the context was refreshed

Actually, let me a bit more clear on 4. We create a child context, so in order to make sure that your context has fully initialized, make sure you also implement ApplicationContextAware on your Starter, and before sending the message check if the contexts are the same otherwise you will get an error if(this.context.equals(event.getApplicationContext()))

Upvotes: 4

Related Questions