Reputation: 312
I'm building Azure event hub consumer using Springboot and it works with web configuration.
I'm trying to achieve same result with non-web configuration as following snippets and getting result as shown is console.
Dependancies:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.0.3</version>
</dependency>
</dependencies>
AzureEventhubConsumerApplication.java
@EnableAsync
@SpringBootApplication
public class AzureEventhubConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(AzureEventhubConsumerApplication.class).web(WebApplicationType.NONE).run(args);
}
}
EventProcessorHostService.java
public class EventProcessorHostService {
@Autowired
EventhubProperties ehProps;
@Autowired
TaskExecutor taskexecutor;
@PostConstruct
public void run() throws ExecutionException, InterruptedException {
EventHubConsumerAsyncClient client = new EventHubClientBuilder()
.connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
.consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
client.receive(true).subscribe(event -> {
PartitionContext context = event.getPartitionContext();
EventData eData = event.getData();
System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());
});
}
}
Console
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.6.RELEASE)
2020-04-29 18:38:52.905 INFO 2248 --- [ restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Starting AzureEventhubConsumerApplication on Nikhils-MacBook-Pro.local with PID 2248 (/AzureEventhubConsumer/target/classes started by nikhil in /Projects/Code/AzureEventhubConsumer)
2020-04-29 18:38:52.908 INFO 2248 --- [ restartedMain] c.e.c.A.AzureEventhubConsumerApplication : No active profile set, falling back to default profiles: default
2020-04-29 18:38:52.957 INFO 2248 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-04-29 18:38:53.467 INFO 2248 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 18:38:53.541 INFO 2248 --- [ restartedMain] c.a.m.eventhubs.EventHubClientBuilder : connectionId[MF_84ae03_1588165733540]: Emitting a single connection.
2020-04-29 18:38:53.623 INFO 2248 --- [ restartedMain] c.a.m.e.i.EventHubConnectionProcessor : connectionId[sbeventhumdemo.servicebus.windows.net] entityPath[spring-event-hub]: Setting next AMQP channel.
2020-04-29 18:38:53.642 INFO 2248 --- [ restartedMain] c.a.c.a.i.ReactorConnection : connectionId[MF_84ae03_1588165733540]: Creating and starting connection to sbeventhumdemo.servicebus.windows.net:5671
2020-04-29 18:38:53.662 INFO 2248 --- [ restartedMain] c.a.c.a.implementation.ReactorExecutor : connectionId[MF_84ae03_1588165733540], message[Starting reactor.]
2020-04-29 18:38:53.684 INFO 2248 --- [ single-1] c.a.c.a.i.handler.ConnectionHandler : onConnectionInit hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.685 INFO 2248 --- [ single-1] c.a.c.a.i.handler.ReactorHandler : connectionId[MF_84ae03_1588165733540] reactor.onReactorInit
2020-04-29 18:38:53.687 INFO 2248 --- [ single-1] c.a.c.a.i.handler.ConnectionHandler : onConnectionLocalOpen hostname[sbeventhumdemo.servicebus.windows.net:5671], connectionId[MF_84ae03_1588165733540], errorCondition[null], errorDescription[null]
2020-04-29 18:38:53.694 INFO 2248 --- [ restartedMain] c.a.c.a.i.ReactorConnection : Emitting new response channel. connectionId: MF_84ae03_1588165733540. entityPath: $management. linkName: mgmt.
2020-04-29 18:38:53.694 INFO 2248 --- [ restartedMain] a.i.RequestResponseChannel<mgmt-session> : connectionId[MF_84ae03_1588165733540] entityPath[$management]: Setting next AMQP channel.
2020-04-29 18:38:53.696 INFO 2248 --- [ restartedMain] c.a.m.e.i.ManagementChannel : Management endpoint state: UNINITIALIZED
2020-04-29 18:38:53.772 INFO 2248 --- [ single-1] c.a.c.a.i.handler.ConnectionHandler : onConnectionBound hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.871 INFO 2248 --- [ single-1] c.a.c.a.i.handler.ReceiveLinkHandler : onLinkLocalOpen connectionId[MF_84ae03_1588165733540], linkName[mgmt:receiver], localSource[Source{address='$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-29 18:38:53.934 INFO 2248 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2020-04-29 18:38:53.961 INFO 2248 --- [ restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Started AzureEventhubConsumerApplication in 1.338 seconds (JVM running for 1.897)
2020-04-29 18:38:53.971 INFO 2248 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
Upvotes: 0
Views: 1308
Reputation: 312
Execute Azure operation through TaskExecutor instance.
Change in EventProcessorHostService.java
@Component
public class EventProcessorHostService {
@Autowired
TaskExecutor taskexecutor;
@Autowired
EventhubProperties ehProps;
private Disposable subscription;
private EventHubConsumerAsyncClient client;
@PostConstruct
public void run() throws ExecutionException, InterruptedException {
taskexecutor.execute(() -> {
client = new EventHubClientBuilder()
.connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
.consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
subscription = client.receive(true).subscribe(new EventProcessor());
});
}
@PreDestroy
public void destroy() {
if (subscription != null) {
subscription.dispose();
}
if (client != null) {
client.close();
}
}
}
Upvotes: 0
Reputation: 715
.subscribe()
is a non-blocking call. From your logs, it looks like the application is exiting before it can initialise the connection to Event Hubs and begin fetching data. If you allow your application to run longer, or add a Thread.sleep
after the subscribe
operation, you should see some data.
As a side note, EventProcessorClient
is better suited for production when consuming all events from an Event Hub. It can load balance and keep track of processed events.
private Disposable subscription;
private EventHubConsumerAsyncClient client;
@PostConstruct
public void run() throws ExecutionException, InterruptedException {
client = new EventHubClientBuilder()
.connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
.consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
subscription = client.receive(true).subscribe(event -> {
PartitionContext context = event.getPartitionContext();
EventData eData = event.getData();
System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());
});
}
@PreDestroy
public void destroy() {
if (subscription != null) {
subscription.dispose();
}
if (client != null) {
client.close();
}
}
Upvotes: 3