Reputation: 1487
I want to send event messages to Azure Event Hub. I noticed if I misconfigured something then my app hangs and not terminates.
I wrote a very simple Java class that tries to send event message to the Event Hub. If I mistype the endpoint of the Event Hub then the app hangs. Which is pretty disappointing.
There is a chance that I misunderstand something but what I want to do is to send a simple message and that's all. How can I do that?
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final EventHubClient ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
ehClient.closeSync();
scheduledExecutorService.shutdown();
I use the following dependency:
compile "com.microsoft.azure:azure-eventhubs:3.2.0"
I'd appreciate any help! Thanks!
Upvotes: 0
Views: 2775
Reputation: 6508
EDIT:
I believe the reason for your hang is the executor is not getting chance to shutdown in case of error. You should wrap the code within try finally like below:
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
if (ehClient != null)
ehClient.closeSync();
scheduledExecutorService.shutdown();
}
N.B. Your code code is using the old azure-eventhubs package (Event Hub v3) as mentioned in this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has bit different APIs which is described in this tutorial. You should use the new SDK if it's fresh development.
import com.azure.messaging.eventhubs.*;
public class Sender {
public static void main(String[] args) {
final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
final String eventHubName = "EVENT HUB NAME";
// create a producer using the namespace connection string and event hub name
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// prepare a batch of events to send to the event hub
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));
// send the batch of events to the event hub
producer.send(batch);
// close the producer
producer.close();
}
}
On further note, there is also a migration guide from v3 to v5 here.
Even with old package, I could not reproduce your hang issue using either Executors.newScheduledThreadPool(4) or Executors.newSingleThreadScheduledExecutor() when closed Executor gracefully as mentioned in the beginning. If I give wrong connection string by mistake, it immediately throws exception: Exception in thread "main" com.microsoft.azure.eventhubs.CommunicationException: A communication error has occurred. This may be due to an incorrect host name in your connection string or a problem with your network connection.
Upvotes: 1
Reputation: 14080
I use Maven to create a java project, then add dependency in pom.xml:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
And This is the code to send the event message:
package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
public static void main( String[] args ) throws Exception
{
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("testbowman")
.setEventHubName("test")
.setSasKeyName("testbowman")
.setSasKey("xxxxxx");
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 10; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
System.out.println( "Hello World!" );
System.out.println( "!!!!!!!!!!!!!" );
}
}
(I hide the sas key, I think you know where to get the sas key.:))
At last, I can see the messages come in on the metrics(Can not see immediately, need wait a few time.):
This is the offcial doc:
Upvotes: 0