Mark Rovetta
Mark Rovetta

Reputation: 691

Filtering Subscriptions to Windows Azure Service Bus Topics using Java

I would like to use Windows Azure Service Bus Topics with subscription filters in my Java application. I am using the Windows Azure Plugin for Eclipse with Java (by Microsoft Open Technologies) and the Windows Azure SDK 2.0.

I found that the basic example code on the Microsoft website showing how to do this with the Java API is not working.

How should the basic example code accomplish the following programmatically from a Java application using the API?

1) Get a ServiceBusContract object for an existing “TestTopic” in my existing namespace.

2) Create an “AllMessages” subscription using the default MatchAll filter. This subscription receives all the messages sent to “TestTopic” in it’s virtual queue.

3) Create a “LowMessages” subscription using a SqlFilter filter to filter for “MessageNumber <=3”. The virtual queue for this subscription should only receive messages having a MessageNumber custom property value less than or equal to 3 .

4) Create a “HighMessages” subscription using a SqlFilter to filter for “MessageNumber <=3”. The virtual queue for this subscription should only receive messages having a MessageNumber custom property value greater than 3.

5) Send a batch of example brokered messages to “TestTopic” with ( 0<= MessageNumber <7)

6) Receive the messages for all three subscriptions and show they have been filtered.

I believe I found reasons the example was not working, and I include the corrected Java code as an answer below for others that may also want to see a basic example of how to do this.

Upvotes: 2

Views: 2890

Answers (1)

Mark Rovetta
Mark Rovetta

Reputation: 691

After obtaining my default management credentials from the Azure portal as describe in the documentation my application was able to get a ServiceBusContract object for my Service Bus namespace as follows:

public class Service 
{
private Configuration config;
public Service() 
{
   String namespace = "jasper";
   String issuer = "owner";
   String key = "BB9BB9BBBbBBBbBbbBBbB99SS9b+Bb9BbB+bbBBbBB9=";
   String serviceBusRootUri = ".servicebus.windows.net";
   String wrapRootUri = "-sb.accesscontrol.windows.net/WRAPv0.9";       
   this.config = ServiceBusConfiguration.configureWithWrapAuthentication(
      namespace, 
      issuer, 
      key,
      serviceBusRootUri,
      wrapRootUri);     
}   
public ServiceBusContract getservice()
{       
   ServiceBusContract service = ServiceBusService.create(config);
   return service;
}   
}

The following Java code can then be used to create three subscriptions to the “TestTopic” topic. “TestTopic” must already exist in the “jasper” namespace. The “AllMessages” subscription is the default and receives all the messages sent to the topic. The “LowMessages” and “HighMessages” subscriptions use SqlFilter rules to filter messages based upon the value of the MessageNumber custom property. The original example code omits that it is necessary to provide a rule name and that it necessary to delete the default rule. If you do not delete the default rule, the subscription still receives all messages.

public class Make_sub_rule 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice();         
   SubscriptionInfo subInfo = new SubscriptionInfo("AllMessages");
   service.createSubscription("TestTopic", subInfo);
   System.out.println(subInfo.getName() + " Default Rules");

   SubscriptionInfo subInfo1 = new SubscriptionInfo("LowMessages");
   CreateSubscriptionResult result1 = service.createSubscription("TestTopic", subInfo1);
   RuleInfo ruleInfo1 = new RuleInfo("RULENAME1");
   ruleInfo1 = ruleInfo1.withSqlExpressionFilter("MessageNumber <= 3");
   CreateRuleResult ruleResult1 = 
      service.createRule("TestTopic", "LowMessages", ruleInfo1);
   service.deleteRule("TestTopic", "LowMessages", "$Default");
   System.out.println(subInfo1.getName() + " " + result1.toString());
   System.out.println(ruleInfo1.getName() + " " + ruleResult1.toString());

   SubscriptionInfo subInfo2 = new SubscriptionInfo("HighMessages");
   CreateSubscriptionResult result2 = service.createSubscription("TestTopic", subInfo2);
   RuleInfo ruleInfo2 = new RuleInfo("RULENAME2");
   ruleInfo2 = ruleInfo2.withSqlExpressionFilter("MessageNumber > 3");
   CreateRuleResult ruleResult2 = 
      service.createRule("TestTopic", "HighMessages", ruleInfo2);
   service.deleteRule("TestTopic", "HighMessages", "$Default");
   System.out.println(subInfo2.getName() + " " + result2.toString());
   System.out.println(ruleInfo2.getName() + " " + ruleResult2.toString());
    }
}

The following sends a batch of brokered messages to “TestTopic” and increments the value of the MessageNumber custom property.

public class SendSbMsTopicB 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice(); 
   for (int i=0; i<7; i++) 
   {
   BrokeredMessage message = new BrokeredMessage("Test message" + i);
   message.setLabel("Day" + i);
   message.setProperty("MessageNumber", i);
   message.setProperty("CustomProperty", "CustomTestValue" + i);
   service.sendTopicMessage("TestTopic", message);
   System.out.println("send MessageNumber " + i + " to topic");
   }
}
}

Run the following code repeatedly to read the virtual queues of the “AllMessages”, “LowMessages”, or “HighMessages” subscriptions (change the value of subscriptionName). Each message is deleted from the virtual queue when it is read until there are no more messages remaining in that subscription’s queue. Note that more than one subscription can receive the same message and that filtered subscriptions do not receive all messages.

public class GetSbMessSub 
{
public static void main(String[] args) throws ServiceException {        
   Service creds = new Service();
   ServiceBusContract service = creds.getservice();     
   String subscriptionName = "LowMessages";
   ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
   opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
   ReceiveSubscriptionMessageResult resultQM = service.receiveSubscriptionMessage("TestTopic", subscriptionName, opts);
   BrokeredMessage message = resultQM.getValue();       
   if (message != null && message.getMessageId() != null) 
   {
   try {
   System.out.println("Subscription: " + subscriptionName);
   System.out.println("MessageNumber: " + message.getProperty("MessageNumber"));
   service.deleteMessage(message);}
   catch (Exception ex){
   System.out.println("Inner exception encountered!");
   service.unlockMessage(message);}
   }
   else {System.out.println("There are no more messages.");}                
   }    
}

Upvotes: 6

Related Questions