anjey
anjey

Reputation: 93

Producer for already provided RabbitMq via Spring Cloud Stream

I find myself in an environment where the rabbitmq's are already provided as given infrastructure.

A Service A is writing into the rabbit queues and Service B is reading from the queues. The reading part as a consumer via Spring Cloud Stream Binder is working like a charme after intensive reading the documentation and setting it up properly.

However i am not able to set up a producer which is going to write into the rabbit queues.


Setup

RabbitMq (already up and running)


Producer-Service (above mentioned Service A)

@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(RabbitChannelSource.class)
public class RabbitSender
{
    private final RabbitChannelSource rabbitChannelSource;


    public void sendMessage()
    {
        Message<String> msg = MessageBuilder.withPayload("TEEEEST").build();
        rabbitChannelSource.myOutput().send(msg);
    }
public interface RabbitChannelSource
{
    String MY_OUTPUT_BINDING = "my-output";

    @Output(MY_OUTPUT_BINDING)
    MessageChannel myOutput();

}

I am trying to make it work first for one queue, but ideally i would set the properties right for all three queues.

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=pass

## use existing rabbitmq via bindings
spring.cloud.stream.bindings.my-output.destination=my-output
#spring.cloud.stream.bindings.my-output.group=myQueueA
spring.cloud.stream.bindings.my-output.producer.required-groups=myQueueA
spring.cloud.stream.rabbit.bindings.my-output.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.queueNameGroupOnly=true

bind-queue=false and declare-exchange=false is necessary since i have rabbit infrastructure.


But however i am getting always the same exception, i could not figure out why. I mean i know why, because there is no appropriate channel istantiated to send messages to. So i am suspecting it has something to do with the application.properties.

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.my-output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={contentType=application/json, id=98698b4c-61fa-596d-736e-f630d3ba4626, timestamp=1605105272723}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at de.techem.emsreceiver.rabbitmq.RabbitSender.sendMessage(RabbitSender.java:25)
    at de.techem.emsreceiver.event.TriggeredEmsImport.execute(TriggeredImport.java:95)
    at de.techem.emsreceiver.event.TriggeredEmsImport$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at de.techem.emsreceiver.event.TriggeredEmsImport$$EnhancerBySpringCGLIB$$a0878aed.execute(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
    at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
    at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)

I want to be able to write into myQueueA, myQueueB or myQueueC based on a routingKey. So if i set routingKey inside my application to myQA the message should be send to myQueueA, for myQB then to myQueueB, and for myQC then to myQueueC.

So three different routing keys lead to the corresponding rabbit queue.


I am happy to any input since i have tried many things which did not lead to me any success. Thank you!

Upvotes: 0

Views: 2038

Answers (2)

anjey
anjey

Reputation: 93

After Gary's suggestion i get the same error as in my first post.

application.properties

# --- spring cloud ---
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password
## use existing rabbitmq via bindings
spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']

RabbitTrigger.java

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitTrigger
{
    private final RabbitSender rabbitSender;


    @EventListener(ApplicationReadyEvent.class)
    public void execute()
    {
        rabbitSender.testSending();
    }

}

RabbitSender.java

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableBinding(Source.class)
public class RabbitSender
{
    private final Source source;


    public void testSending()
    {
        Message<String> testMessage = MessageBuilder.withPayload("TEEEEST")
                                                    .setHeader("routeTo", "CRE")
                                                    .build();
        source.output().send(testMessage);
    }

But i get the same exception as i was getting in my initial post. I assume i did not consider a crucial part to this?

2020-11-25 16:27:17,116 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={routeTo=CRE, id=15479810-375f-38e8-a692-e84c6ede01d7, contentType=application/json, timestamp=1606318037108}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at de.techem.emsreceiver.rabbitmq.RabbitSender.testSending(RabbitSender.java:34)
    at de.techem.emsreceiver.event.RabbitTrigger.execute(RabbitTrigger.java:96)
    at de.techem.emsreceiver.event.RabbitTrigger$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at de.techem.emsreceiver.event.RabbitTrigger$$EnhancerBySpringCGLIB$$d00eff49.execute(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
    at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
    at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    ... 60 common frames omitted

Setting up the producer is way more complicated for me than it was for the consumer part.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174769

Perhaps you are trying to use the binding before it is bound? This works fine for me:

spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@SpringBootApplication
@EnableBinding(Source.class)
public class So64788954Application {

    public static void main(String[] args) {
        SpringApplication.run(So64788954Application.class, args);
    }

    @Autowired
    Source source;

    @Bean
    ApplicationRunner runner() {
        return args -> {
            source.output().send(MessageBuilder.withPayload("foo")
                    .setHeader("routeTo", "myQA")
                    .build());

            source.output().send(MessageBuilder.withPayload("bar")
                    .setHeader("routeTo", "myQB")
                    .build());

            source.output().send(MessageBuilder.withPayload("baz")
                    .setHeader("routeTo", "myQC")
                    .build());

        };
    }

    @RabbitListener(queues = { "myQueueA", "myQueueB", "myQueueC" } )
    void listen(String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + ", from: " + queue);
    }

}
foo, from: myQueueA
bar, from: myQueueB
baz, from: myQueueC

You don't need required groups on the producer side when you bring your own queues/exchanges.

The @RabbitListener` is just there to consume the messages sent to the 3 queues.

By the way, the annotation model is deprecated; the functional programming model is now preferred, where we use a StreamBridge for output (and Consumer<?> or Function<?, ?> to consume and process respectively).

Here is the equivalent of the above:

spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.routing-key-expression=headers['routeTo']
@SpringBootApplication
public class So64788954Application {

    public static void main(String[] args) {
        SpringApplication.run(So64788954Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    ApplicationRunner runner() {
        return args -> {
            bridge.send("my-output", MessageBuilder.withPayload("foo")
                    .setHeader("routeTo", "myQA")
                    .build());

            bridge.send("my-output", MessageBuilder.withPayload("bar")
                    .setHeader("routeTo", "myQB")
                    .build());

            bridge.send("my-output", MessageBuilder.withPayload("baz")
                    .setHeader("routeTo", "myQC")
                    .build());

        };
    }

    @RabbitListener(queues = { "myQueueA", "myQueueB", "myQueueC" } )
    void listen(String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + ", from: " + queue);
    }

}

Upvotes: 1

Related Questions