agata
agata

Reputation: 471

How to get all the queues and topics from solace

I want to discover all the destinations from solace (queues and topics)

I tried using MBeanServerConnection and query after names (but I didn't find a proper way to use this) or JNDI lookups Destination dest = (Destination) context.lookup(Dest_name), but I don't have the names of the queues/topics. I am using solace - jms library.

I am searching for smth like this: (but for solace, not activeMq) get all Queue from activeMQ

Upvotes: 1

Views: 3240

Answers (3)

Ashish Sharma
Ashish Sharma

Reputation: 1

You can get message VPN specific queues and topics using following SEMPv2 command.

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/queues?select="queueName"

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/topicEndpoints?select="topicEndpointName"

Upvotes: 0

Axel Podehl
Axel Podehl

Reputation: 4323

Here is some source code that might help. With the appliance configured correctly, SEMP is also available over JMS on topic "#SEMP/(router)/SHOW".

  /**
   * Return the SolTopicInfo for this topic (or all topics if 'topic' is null).
   * 
   * @param session
   * @param endpointName
   * @return
   */
  public static SolTopicInfo[] getTopicInfo(JCSMPSession session, String endpointName, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolTopicInfo> tiMap = new HashMap<String, SolTopicInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.
      final String SEMP_SHOW_TE_TOPICS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><topic-endpoint><name>"
          + endpointName
          + "</name><vpn-name>"+ vpn + "</vpn-name></topic-endpoint></show></rpc>";

      RpcReply teTopics =  sendRequest(session, SEMP_SHOW_TE_TOPICS);

      for (TopicEndpoint2 te : teTopics.getRpc().getShow().getTopicEndpoint().getTopicEndpoints()
          .getTopicEndpointArray()) {
        SolTopicInfo ti = new SolTopicInfo();
        ti.setBindCount(te.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        ti.setEndpoint(te.getName());
        ti.setMessageVPN(te.getInfo().getMessageVpn());
        ti.setTopic(te.getInfo().getDestination());
        ti.setDurable(te.getInfo().getDurable());
        ti.setInSelPres(te.getInfo().getIngressSelectorPresent());
        ti.setHwmMB(formatter.format(te.getInfo().getHighWaterMarkInMb()));
        ti.setSpoolUsageMB(formatter.format(te.getInfo().getCurrentSpoolUsageInMb()));
        ti.setMessagesSpooled(te.getInfo().getNumMessagesSpooled().longValue());
        String status = te.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getIngressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getType().substring(0, 1).toUpperCase();
        ti.setStatus(status);

        tiMap.put(ti.getEndpoint(), ti);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return tiMap.values().toArray(new SolTopicInfo[0]);
  }

  /**
   * Return the SolQueueInfo for this queue (or all queues if 'queue' is null).
   * 
   * @param session
   * @param queue
   * @param vpn (if null, use the session's vpn name)
   * @param sempVersion, if null use 'soltr/7_1_1'
   * @return
   */
  public static SolQueueInfo[] getQueueInfo(JCSMPSession session, String queue, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolQueueInfo> qiMap = new HashMap<String, SolQueueInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.

      final String SEMP_SHOW_QUEUE_SUBS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><queue><name>"
          + queue
          + "</name><vpn-name>"+ vpn + "</vpn-name><subscriptions/><count/><num-elements>200</num-elements></queue></show></rpc>";

      RpcReply queueSubs = sendRequest(session, SEMP_SHOW_QUEUE_SUBS);

      for (QueueType qt : queueSubs.getRpc().getShow().getQueue().getQueues().getQueueArray()) {
        SolQueueInfo qi = new SolQueueInfo();
        qi.setBindCount(qt.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        qi.setName(qt.getName());
        qi.setMessageVPN(qt.getInfo().getMessageVpn());
        qi.setDurable(qt.getInfo().getDurable());
        qi.setEgSelPres(qt.getInfo().getEgressSelectorPresent());
        qi.setHwmMB(formatter.format(qt.getInfo().getHighWaterMarkInMb()));
        qi.setMessagesSpooled(qt.getInfo().getNumMessagesSpooled().longValue());
        qi.setSpoolUsageMB(formatter.format(qt.getInfo().getCurrentSpoolUsageInMb()));
        String status = qt.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getAccessType().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getType().substring(0, 1).toUpperCase();
        status += qt.getInfo().getDurable() ? " D" : " N";
        qi.setStatus(status);

        for (Subscription sub : qt.getSubscriptions().getSubscriptionArray()) {
          qi.addSubscription(sub.getTopic());
        }

        qiMap.put(qi.getName(), qi);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return qiMap.values().toArray(new SolQueueInfo[0]);
  }

  private static String getSempVersion(JCSMPSession session)
  {
    String retval = "soltr/7_1_1";
    try {
      String peerVersion = (String)session.getCapability(CapabilityType.PEER_SOFTWARE_VERSION);
      if (peerVersion != null)
      {
        retval = "soltr/";
        String[] version = peerVersion.split("\\.");
        retval += version[0];
        retval += "_" + version[1];
        if (!version[2].equals("0")) retval += "_" + version[2];
      }
    } catch (Throwable e) {
      System.err.println(e);
    }
    return retval;
  }

  private static RpcReply sendRequest(JCSMPSession session,
      final String requestStr)  {
    try {
      // Set up the requestor and request message.
      String routerName = (String) session
          .getCapability(CapabilityType.PEER_ROUTER_NAME);

      final String SEMP_TOPIC_STRING = String.format("#SEMP/%s/SHOW",
          routerName);
      final Topic SEMP_TOPIC = JCSMPFactory.onlyInstance().createTopic(
          SEMP_TOPIC_STRING);
      Requestor requestor = session.createRequestor();
      BytesXMLMessage requestMsg = JCSMPFactory.onlyInstance().createMessage(
          BytesXMLMessage.class);
      requestMsg.writeAttachment(requestStr.getBytes());

      BytesXMLMessage replyMsg = requestor
          .request(requestMsg, 5000, SEMP_TOPIC);

      String replyStr = new String();
      if (replyMsg.getAttachmentContentLength() > 0) {
        byte[] bytes = new byte[replyMsg.getAttachmentContentLength()];
        replyMsg.readAttachmentBytes(bytes);
        replyStr = new String(bytes, "US-ASCII");
      }

      RpcReplyDocument doc = RpcReplyDocument.Factory.parse(replyStr);

      RpcReply reply = doc.getRpcReply();

      if (reply.isSetPermissionError()) {
        throw new RuntimeException(
            "Permission Error: Make sure SEMP over message bus SHOW commands are enabled for this VPN");
      }

      if( reply.isSetParseError() ) {
        throw new RuntimeException( "SEMP Parse Error: " + reply.getParseError() );
      }

      if( reply.isSetLimitError() ) {
        throw new RuntimeException( "SEMP Limit Error: " + reply.getLimitError() );
      }

      if( reply.isSetExecuteResult() && reply.getExecuteResult().isSetReason() ) { // axelp: encountered this error on invalid 'queue' name
        throw new RuntimeException( "SEMP Execution Error: " + reply.getExecuteResult().getReason() );
      }

      return reply;
    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (UnsupportedEncodingException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (XmlException e) {

      throw new RuntimeException(e.getMessage(), e);
    }
  }

Upvotes: 0

Russell Sim
Russell Sim

Reputation: 1733

You will need to make use of SEMP over the management interface for this.

Sample commands:

curl -d '<rpc><show><queue><name>*</name></queue></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP
curl -d '<rpc><show><topic-endpoint><name>*</name></topic-endpoint></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP

Note that I'm using curl for simplicity, but any application can perform HTTP POSTs to execute these commands. If you are using Java, you can refer to the SempHttpSetRequest sample found within the Solace API samples.

Documentation on SEMP can be found here.


However, the larger question here is why do you need to discover all destinations?

One of the features of the message broker is to decouple the publishers and consumers.

If you need to know if your persistent message is being published to a topic with no consumers, you can make use of the reject-msg-to-sender-on-no-subscription-match setting in the publishing application's client-profile. This means that the publisher will obtain a negative acknowledgement in the event that it tries to publish a message on a topic that has no matching subscribers.

You can refer to "Handling Guaranteed Messages with No Matches" at https://docs.solace.com/Configuring-and-Managing/Configuring-Client-Profiles.htm for further details.

Upvotes: 0

Related Questions