Viktor
Viktor

Reputation: 1487

Client hangs when calling Azure Event Hub and facing connection error

I want to send event messages to Azure Event Hub. I noticed if I misconfigured something then my app hangs and not terminates.
I wrote a very simple Java class that tries to send event message to the Event Hub. If I mistype the endpoint of the Event Hub then the app hangs. Which is pretty disappointing.

There is a chance that I misunderstand something but what I want to do is to send a simple message and that's all. How can I do that?

    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
    connectionStringBuilder
            .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
            .setTransportType(TransportType.AMQP_WEB_SOCKETS)
            .setSasKeyName("XXX")
            .setSasKey("XXX");
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    final EventHubClient ehClient =
            EventHubClient.createFromConnectionStringSync(
                    connectionStringBuilder.toString(),
                    RetryPolicy.getNoRetry(),
                    scheduledExecutorService
            );
    ehClient.sendSync(EventData.create("Test Message".getBytes()));
    ehClient.closeSync();
    scheduledExecutorService.shutdown();

I use the following dependency:

    compile "com.microsoft.azure:azure-eventhubs:3.2.0"

I'd appreciate any help! Thanks!

Upvotes: 0

Views: 2775

Answers (2)

krishg
krishg

Reputation: 6508

EDIT:

I believe the reason for your hang is the executor is not getting chance to shutdown in case of error. You should wrap the code within try finally like below:

ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
    connectionStringBuilder
            .setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
            .setTransportType(TransportType.AMQP_WEB_SOCKETS)
            .setSasKeyName("XXX")
            .setSasKey("XXX");
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
    ehClient =
            EventHubClient.createFromConnectionStringSync(
                    connectionStringBuilder.toString(),
                    RetryPolicy.getNoRetry(),
                    scheduledExecutorService
            );
    ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
    if (ehClient != null)
       ehClient.closeSync();
    scheduledExecutorService.shutdown();
}

N.B. Your code code is using the old azure-eventhubs package (Event Hub v3) as mentioned in this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has bit different APIs which is described in this tutorial. You should use the new SDK if it's fresh development.

import com.azure.messaging.eventhubs.*;

public class Sender {
    public static void main(String[] args) {
        final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
        final String eventHubName = "EVENT HUB NAME";

        // create a producer using the namespace connection string and event hub name
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // prepare a batch of events to send to the event hub    
        EventDataBatch batch = producer.createBatch();
        batch.tryAdd(new EventData("First event"));
        batch.tryAdd(new EventData("Second event"));
        batch.tryAdd(new EventData("Third event"));
        batch.tryAdd(new EventData("Fourth event"));
        batch.tryAdd(new EventData("Fifth event"));

        // send the batch of events to the event hub
        producer.send(batch);

        // close the producer
        producer.close();
    }
}

On further note, there is also a migration guide from v3 to v5 here.

Even with old package, I could not reproduce your hang issue using either Executors.newScheduledThreadPool(4) or Executors.newSingleThreadScheduledExecutor() when closed Executor gracefully as mentioned in the beginning. If I give wrong connection string by mistake, it immediately throws exception: Exception in thread "main" com.microsoft.azure.eventhubs.CommunicationException: A communication error has occurred. This may be due to an incorrect host name in your connection string or a problem with your network connection.

Upvotes: 1

suziki
suziki

Reputation: 14080

I use Maven to create a java project, then add dependency in pom.xml:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

And This is the code to send the event message:

package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
    public static void main( String[] args ) throws Exception
    {
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("testbowman")
                .setEventHubName("test")
                .setSasKeyName("testbowman")
                .setSasKey("xxxxxx");
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 10; i++) {
                String payload = "Message " + Integer.toString(i);
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
                EventData sendEvent = EventData.create(payloadBytes);
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
        System.out.println( "Hello World!" );
        System.out.println( "!!!!!!!!!!!!!" );
    }
}

(I hide the sas key, I think you know where to get the sas key.:))

At last, I can see the messages come in on the metrics(Can not see immediately, need wait a few time.):

enter image description here

This is the offcial doc:

https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send#write-code-to-send-events

Upvotes: 0

Related Questions