Nikhil
Nikhil

Reputation: 312

Azure eventhub EventHubConsumerAsyncClient could not initiate in Springboot non-web application

I'm building Azure event hub consumer using Springboot and it works with web configuration.

I'm trying to achieve same result with non-web configuration as following snippets and getting result as shown is console.

Dependancies:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.0.3</version>
    </dependency>
</dependencies>

AzureEventhubConsumerApplication.java

@EnableAsync
@SpringBootApplication
public class AzureEventhubConsumerApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(AzureEventhubConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }
}

EventProcessorHostService.java

public class EventProcessorHostService {

    @Autowired
    EventhubProperties ehProps;

    @Autowired
    TaskExecutor taskexecutor;

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {
        EventHubConsumerAsyncClient client = new EventHubClientBuilder()
                .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

        client.receive(true).subscribe(event -> {
            PartitionContext context = event.getPartitionContext();
            EventData eData = event.getData();
            System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());

        });

    }
}

Console

 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.6.RELEASE)

2020-04-29 18:38:52.905  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Starting AzureEventhubConsumerApplication on Nikhils-MacBook-Pro.local with PID 2248 (/AzureEventhubConsumer/target/classes started by nikhil in /Projects/Code/AzureEventhubConsumer)
2020-04-29 18:38:52.908  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : No active profile set, falling back to default profiles: default
2020-04-29 18:38:52.957  INFO 2248 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-04-29 18:38:53.467  INFO 2248 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 18:38:53.541  INFO 2248 --- [  restartedMain] c.a.m.eventhubs.EventHubClientBuilder    : connectionId[MF_84ae03_1588165733540]: Emitting a single connection.
2020-04-29 18:38:53.623  INFO 2248 --- [  restartedMain] c.a.m.e.i.EventHubConnectionProcessor    : connectionId[sbeventhumdemo.servicebus.windows.net] entityPath[spring-event-hub]: Setting next AMQP channel.
2020-04-29 18:38:53.642  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : connectionId[MF_84ae03_1588165733540]: Creating and starting connection to sbeventhumdemo.servicebus.windows.net:5671
2020-04-29 18:38:53.662  INFO 2248 --- [  restartedMain] c.a.c.a.implementation.ReactorExecutor   : connectionId[MF_84ae03_1588165733540], message[Starting reactor.]
2020-04-29 18:38:53.684  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionInit hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.685  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReactorHandler         : connectionId[MF_84ae03_1588165733540] reactor.onReactorInit
2020-04-29 18:38:53.687  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionLocalOpen hostname[sbeventhumdemo.servicebus.windows.net:5671], connectionId[MF_84ae03_1588165733540], errorCondition[null], errorDescription[null]
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : Emitting new response channel. connectionId: MF_84ae03_1588165733540. entityPath: $management. linkName: mgmt.
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] a.i.RequestResponseChannel<mgmt-session> : connectionId[MF_84ae03_1588165733540] entityPath[$management]: Setting next AMQP channel.
2020-04-29 18:38:53.696  INFO 2248 --- [  restartedMain] c.a.m.e.i.ManagementChannel              : Management endpoint state: UNINITIALIZED
2020-04-29 18:38:53.772  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionBound hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.871  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_84ae03_1588165733540], linkName[mgmt:receiver], localSource[Source{address='$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-29 18:38:53.934  INFO 2248 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2020-04-29 18:38:53.961  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Started AzureEventhubConsumerApplication in 1.338 seconds (JVM running for 1.897)
2020-04-29 18:38:53.971  INFO 2248 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

Upvotes: 0

Views: 1308

Answers (2)

Nikhil
Nikhil

Reputation: 312

Execute Azure operation through TaskExecutor instance.

Change in EventProcessorHostService.java

@Component
public class EventProcessorHostService {

    @Autowired
    TaskExecutor taskexecutor;

    @Autowired
    EventhubProperties ehProps;

    private Disposable subscription;
    private EventHubConsumerAsyncClient client;

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {

        taskexecutor.execute(() -> {
            client = new EventHubClientBuilder()
                    .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                    .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
            subscription = client.receive(true).subscribe(new EventProcessor());
        });

    }

    @PreDestroy
    public void destroy() {
        if (subscription != null) {
            subscription.dispose();
        }

        if (client != null) {
            client.close();
        }

    }
}

Upvotes: 0

Connie Yau
Connie Yau

Reputation: 715

.subscribe() is a non-blocking call. From your logs, it looks like the application is exiting before it can initialise the connection to Event Hubs and begin fetching data. If you allow your application to run longer, or add a Thread.sleep after the subscribe operation, you should see some data.

As a side note, EventProcessorClient is better suited for production when consuming all events from an Event Hub. It can load balance and keep track of processed events.

private Disposable subscription;
private EventHubConsumerAsyncClient client;

@PostConstruct
public void run() throws ExecutionException, InterruptedException {
    client = new EventHubClientBuilder()
            .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
            .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

    subscription = client.receive(true).subscribe(event -> {
        PartitionContext context = event.getPartitionContext();
        EventData eData = event.getData();
        System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());
    });
}

@PreDestroy
public void destroy() {
    if (subscription != null) {
        subscription.dispose();
    }
    if (client != null) {
        client.close();
    }
}

Upvotes: 3

Related Questions