Reputation: 1006
The exact usage is like this:
@Slf4j
public class Client<E, Key> {
@Getter @NonNull private final UpdateListener<E, Key> updateListener;
@NonNull private final SubscriptionFactory subscriptionFactory;
@NonNull private final Map<Key, Instant> updatedRegistry = new ConcurrentHashMap<>();
public Client(UpdateListener<E, Key> updateListener,
SubscriptionFactory subscriptionFactory) {
this.updateListener = updateListener;
this.subscriptionFactory = subscriptionFactory;
this.subscriptionFactory.registerSnapshotClient(updateListener);
log.info("Created new snapshot client for entity key [{}], update type [{}] and component qualifier [{}]",
updateListener.getEntityKey(),
updateListener.getOptionalChangeType(),
updateListener.getComponentQualifier());
}
@RabbitListener(queues = {"#{@queueNameCreator.createUpdateQueueName(snapshotClient.getUpdateListener())}",
"#{@queueNameCreator.createSnapshotQueueName(snapshotClient.getUpdateListener())}"})
public void handleMessage(Message<E> rawUpdate, @Header("last_updated") Instant newUpdatedTime) {
...//more code
}
}
Each 'Client' instance has its own bean id to not clash with each other.
How can I call get the exact updateListener of this object using SpEl?
After using programattical approach and registering method I get the following exception:
Apr 28, 2015 3:22:47 PM org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
WARNING: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.everymatrix.om2020.messaging.model.SnapshotClient.handleMessage(org.springframework.messaging.Message<E>,java.time.Instant)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: No suitable resolver for argument [0] [type=org.springframework.messaging.Message]
Done, you need to do the following to achieve the desired behaviour.
@Configuration
@EnableRabbit
public static class OmbeRabbitListenerConfigurer implements RabbitListenerConfigurer {
@Autowired ApplicationContext applicationContext;
@Autowired SnapshotClientQueueNamesCreator snapshotClientQueueNamesCreator;
@Autowired RabbitListenerContainerFactory rabbitListenerContainerFactory;
@Autowired MessageConverter messageConverter;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
final Collection<SnapshotClient> snapshotClients = applicationContext.getBeansOfType(SnapshotClient.class).values();
System.out.println(snapshotClients);
snapshotClients.stream().forEach(bean -> {
final String snapshotQueueName = snapshotClientQueueNamesCreator.createSnapshotQueueName(bean.getUpdateListener());
final String updateQueueName = snapshotClientQueueNamesCreator.createUpdateQueueName(bean.getUpdateListener());
Method method = Stream.of(bean.getClass().getMethods()).filter(x -> x.getName().equals("handleMessage")).findAny().get();
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
final DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.afterPropertiesSet();
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setId(snapshotQueueName + ":" + updateQueueName + UUID.randomUUID());
endpoint.setQueueNames(snapshotQueueName, updateQueueName);
endpoint.setExclusive(false);
registrar.registerEndpoint(endpoint, rabbitListenerContainerFactory);
});
}
}
Upvotes: 1
Views: 714
Reputation: 174769
Your question is not clear - you seem to be mixing runtime and initialization time concepts.
For example, "#{@queueNameCreator.createUpdateQueueName(e.c.doSomething())}"
is evaluated once during initialization - it's not clear from this expression what e
is, or where it comes from.
But, you seem to be passing in an E
in the payload of message: Message<E> rawUpdate
. This message came from the queue and therefore can't influence the queue name.
Perhaps if you can explain what you are trying to do rather than how you have attempted to do it, I can update this "answer" with possible solutions.
EDIT:
If you mean you want to reference some field in the current (listener) bean in your SpEL then it can't be done directly.
EDIT2:
I can't think of any way to get a reference to the current bean in the SpEL expression - it has to be a constant; that's just the way annotations work in Java; they are tied to the class, not the instance.
I think to do what you want, you would need to revert to using programmatic endpoint registration. However, you'd need to wire in a MethodRabbitListenerEndpoint
(rather than the SimpleRabbitListenerEndpoint
) to get the benefits of the annotation you are looking for (@Header
etc).
We don't really cover it in the documentation; it's a little advanced, but essentially, you need to inject the bean and Method
(for the listener), and a DefaultMessageHandlerMethodFactory
.
Upvotes: 1