Reputation: 95
I am working on receiving pending messages in Azure Service Bus queue.
I have created a queue (SampleQueue) in Azure ServiceBus and I am able to send the message successfully in that queue via POSTMAN using a SAS token which I generate with my Java program.
I am also getting a 201 created status after hitting my service bus queue api url(below image).
I want to receive the message pending in my Service bus queue. I went through some links about receiving message (https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues), but this does not contain information about how I can receive and view those messages.
My Java code that receives the message from Service bus queue looks like below[I am a novice in Java]:-
public class Test2 {
public static void main(String[] args) throws ServiceException {
String namespace = "SampleNamespace";
String sharedKeyName = "RootManageSharedAccessKey";
String sharedSecretKey = "t+U5ERMAnIyxgEUDUouGOKn6ADM/CuLWzEJZtauwVsc=";
String queueName = "QueueName";
// Azure Service Bus Service
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication(namespace, sharedKeyName, sharedSecretKey, ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
// Receive and Delete Messages
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.RECEIVE_AND_DELETE);
while (true) {
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage(queueName , opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null) {
System.out.println("Body: " + message.toString());
System.out.println("MessageID: " + message.getMessageId());
} else {
System.out.println("No more messages.");
break;
}
}
}
}
But when I run this code, I get the below error:-
Exception in thread "main" java.lang.NoClassDefFoundError: javax/ws/rs/WebApplicationException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructors(Class.java:1651)
at com.microsoft.windowsazure.core.DefaultBuilder.findInjectConstructor(DefaultBuilder.java:67)
at com.microsoft.windowsazure.core.DefaultBuilder.add(DefaultBuilder.java:94)
at com.microsoft.windowsazure.services.servicebus.Exports.register(Exports.java:34)
at com.microsoft.windowsazure.core.DefaultBuilder.create(DefaultBuilder.java:46)
at com.microsoft.windowsazure.Configuration.<init>(Configuration.java:80)
at com.microsoft.windowsazure.Configuration.load(Configuration.java:100)
at com.microsoft.windowsazure.Configuration.getInstance(Configuration.java:90)
at com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration.configureWithSASAuthentication(ServiceBusConfiguration.java:252)
at com.rocky.servicebus.queue.Test2.main(Test2.java:24)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.WebApplicationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
Can anyone please help in rectifying what I am doing wrong? Would be greatful for any help.
Thanks, Rudra
Upvotes: 1
Views: 3829
Reputation: 95
For all the wanderers, below is the working Java code to fetch pending messages from Azure SB Queue:-
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
public class Test1 {
//static StringWriter writer = new StringWriter();
public static void main(String...s) throws Exception{
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication("Your_NameSpace", "RootManageSharedAccessKey", "Mkf1H3g9qg0LrNEP1QbZ/EJKSARmJZQdOI6ek6obalI=", ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
while(true)
{
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage("Queue_Name", opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null)
{
try
{
// IOUtils.copy(message.getBody(), writer, encoding);
Scanner s1 = new Scanner(message.getBody()).useDelimiter("\\A");
String result = s1.hasNext() ? s1.next() : "";
//above will convert InputStream in String
System.out.println("Body: " + message.toString());
System.out.println("MainBody : " + result );
System.out.println("MessageID: " + message.getMessageId());
System.out.println("Custom Property: " +
message.getProperty("TestProperty"));
// Remove message from queue
System.out.println("Deleting this message.");
service.deleteMessage(message);
}
catch (Exception ex)
{
// Indicate a problem, unlock message in queue
System.out.println("Inner exception encountered!");
service.unlockMessage(message);
}
}
else
{
System.out.println("Finishing up - no more messages.");
break;
// Added to handle no more messages in the queue.
// Could instead wait for more messages to be added.
}
}
}
}
Make sure to get required Maven Dependencies for "BrokeredMessage".
Thanks, Rudra
Upvotes: 1
Reputation: 5549
Based on the tutorial for receiving message, you need to create a queue client, and register a message handler for it.
A) Get connection string.
B) A code sample for sending and receiving messages
public static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
queueClient.registerMessageHandler(
new IMessageHandler() {
public CompletableFuture<Void> onMessageAsync(IMessage message) {
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("TestMessage") &&
message.getContentType().contentEquals("text/plain")) {
System.out.printf(
"\nMessage received: \n -->MessageId = %s\n -->ContentType = %s\n -->Content = %s\n",
message.getMessageId(),
message.getContentType(),
new String(message.getBody())
);
return queueClient.completeAsync(message.getLockToken());
}
return queueClient.abandonAsync(message.getLockToken());
}
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
new MessageHandlerOptions(1, false, Duration.ofSeconds(10)),
executorService
);
}
public static void sendMessages(QueueClient client) throws ServiceBusException, InterruptedException {
for (int i = 0; i < 100; i++) {
String messageId = Integer.toString(i);
Message message = new Message("This is message " + i);
message.setContentType("text/plain");
message.setLabel("TestMessage");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(10));
client.send(message);
System.out.printf("Message sent: Id = %s \n", message.getMessageId());
}
}
public static void main(String[] args) throws Exception {
String connectionString = "your_connection_string, Endpoint=sb://j*9.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=V*=";
String queueName = "your_queue_name, testQueue";
QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
sendMessages(client);
client.close();
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
ExecutorService executorService = Executors.newSingleThreadExecutor();
registerReceiver(receiveClient, executorService);
Thread.sleep(60 * 1000); // Wait for 60 seconds to receive all the messages.
receiveClient.close();
executorService.shutdown();
}
Result:
100 messages will be sent.
Message sent: Id = 0
Message sent: Id = 1
Message sent: Id = 2
Message sent: Id = 3
*
*
*
Message sent: Id = 99
And then will start to receive messages.
Message received:
-->MessageId = 0
-->ContentType = text/plain
-->Content = This is message 0
Message received:
-->MessageId = 1
-->ContentType = text/plain
-->Content = This is message 1
Message received:
-->MessageId = 2
-->ContentType = text/plain
-->Content = This is message 2
*
*
*
Message received:
-->MessageId = 99
-->ContentType = text/plain
-->Content = This is message 99
Upvotes: 2