Reputation: 1251
Below is a detailed explanation.
Currently I have integrated Azure Service Bus in my application and we listen message as soon as spring boot application starts. Now I want to modify this logic. By default Azure Service Bus Message Listener will be disable. On ApplicationReadyEvent
I want to perform some task and after that again I want to enable Azure Service Bus Message Listener to start listening from topic or queue.
So how can I achieve that ?
application.yml
spring:
cloud:
azure:
servicebus:
namespace: **********
xxx:
azure:
servicebus:
connection: ***********
queue: **********
AzureConfiguration.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
@Configuration
public class AzureConfiguration{
@Value("${xxx.azure.servicebus.connection}")
private String serviceBusConnection;
@Value("${xxx.azure.servicebus.queue}")
private String serviceBusQueue;
private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
private static final String SENSOR_DATA_CHANNEL = "zzzzz";
private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";
@Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setConnectionString(serviceBusConnection);
containerProperties.setEntityName(serviceBusQueue);
containerProperties.setAutoComplete(true);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean(name = SERVICE_BUS_INPUT_CHANNEL)
public MessageChannel serviceBusInputChannel() {
return new DirectChannel();
}
@Bean(name = SENSOR_DATA_CHANNEL)
public MessageChannel sensorDataChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow serviceBusMessageFlow() {
return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
.<byte[], String>transform(String::new)
.channel(SENSOR_DATA_CHANNEL)
.get();
}
}
AppEventListenerService.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// By Default Azure Service Bus Message Listener will be disable
// do some task
// Enable Azure Bus Message Listener
log.debug("Exit OnApplicationStarted");
}
}
In above code in AppEventListenerService.java ,
// Enable Azure Bus Message Listener - Here I want to start ServiceBusConsumer to receive message from topic/queue.
Upvotes: 0
Views: 2633
Reputation: 83
Literally, if you just want to stop the listener and then start it on ApplicationReadyEvent
, then you can autowire the ServiceBusInboundChannelAdapter
(or ServiceBusMessageListenerContainer
) in your AppEventListenerService.java
and then simply call the its stop() and start() API in the AppEventListenerService#OnApplicationStarted
method.
However, both the ServiceBusMessageListenerContainer
and ServiceBusInboundChannelAdapter
implements SmartLifecycle
interface and is enabled auto-start-up by default. So if you use the above solution, the listener (as well as adapter) has been triggered to start before ApplicationReadyEvent
, which means there will still be a period that the listener is consuming messages.
So I assume you may want to turn off the listener till your own business logic has been done. If so, then currently ServiceBusMessageListenerContainer
doesn't provide the function to disable auto-start-up, and we will put your feature request to our backlog.
But you could still use the below workarounds to meet your request.
ServiceBusMessageListenerContainer
to override the auto-start-up behavior,public class CustomServiceBusMessageListenerContainer extends ServiceBusMessageListenerContainer {
private boolean autoStartUp = true;
/**
* Create an instance using the supplied processor factory and container properties.
* @param processorFactory the processor factory.
* @param containerProperties the container properties.
*/
public CustomServiceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory, ServiceBusContainerProperties containerProperties) {
super(processorFactory, containerProperties);
}
public void setAutoStartUp(boolean autoStartUp) {
this.autoStartUp = autoStartUp;
}
@Override
public final boolean isAutoStartup() {
return this.autoStartUp;
}
}
ServiceBusMessageListenerContainer
and ServiceBusInboundChannelAdapter
bean, disable their auto-start-up function. @Bean(SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(QUEUE_NAME);
...
CustomServiceBusMessageListenerContainer listenerContainer = new CustomServiceBusMessageListenerContainer(processorFactory, containerProperties);
listenerContainer.setAutoStartUp(false);
return listenerContainer;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
adapter.setAutoStartup(false);
return adapter;
}
ServiceBusInboundChannelAdapter
after your business logic in AppEventListenerService#OnApplicationStarted
.This might be a bit hack, since we don't expose the api to disable auto-start-up in ServiceBusMessageListenerContainer
, but it can be done in ServiceBusInboundChannelAdapter
. So you can choose to not declare a bean of ServiceBusMessageListenerContainer
but change it as a local variable for the adapter,
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(QUEUE_NAME);
...
ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
adapter.setAutoStartup(false);
return adapter;
}
then start the ServiceBusInboundChannelAdapter
after your business logic in AppEventListenerService#OnApplicationStarted
.
Upvotes: 1
Reputation: 7656
As you're using an integration flow registered as a bean the simplest way to start/stop it is to autowire it as StandardIntegrationFlow
and call the corresponding method like so:
@Slf4j
@Service
@DependsOn({"serviceBusMessageFlow"})
@RequiredArgsConstructor
public class AppEventListenerService {
private final StandardIntegrationFlow serviceBusMessageFlow;
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// Disable Azure Bus Message Listener
serviceBusMessageFlow.stop();
// do some task
// Enable Azure Bus Message Listener
serviceBusMessageFlow.start();
log.debug("Exit OnApplicationStarted");
}
}
Note the @DependsOn
annotation might be needed to force the flow bean to be initialized before the event listener bean.
Also, it should be noted that some messages might happen to go through after the flow is initialized and before the listener is triggered.
Upvotes: 0
Reputation: 2069
Here I have a work around where we use the JMS to consume the service bus message
The reason to use JMS is that when we use the @JMSListener
we can start stop it.
Now to Implement JMS with ServiceBus refer this MSDOC
Now you have to Autowired
this JmsListenerEndpointRegistry
object and stop the listener.
@Autowired
JmsListenerEndpointRegistry registry;
To stop the JMS you will have to use the stop function:
registry.stop();
@Component
@RestController
public class Reciever {
@Autowired
JmsListenerEndpointRegistry registry;
@GetMapping("/stop")
public String readBlobFile ()
{
registry.stop();
return "Stopped" ;
}
@GetMapping("/start")
public String readBlobFile1 ()
{
registry.start();
return "StARTED" ;
}
private static final String QUEUE_NAME = "test";
private final Logger logger = LoggerFactory.getLogger(Reciever.class);
@JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(String s) {
logger.info("Received message: {}", s);
}
}
/stop
Api which will stop the JMS and the message will only start coming once the /start
api is called .output:
Upvotes: 0