vels4j
vels4j

Reputation: 11298

Apache Camel creating Consumer component

I'm newbie to Apache Camel. In hp nonstop there is a Receiver that receives events generated by event manager assume like a stream. My goal is to setup a consumer end point which receives the incoming message and process it through Camel.

Another end point I simply need to write it in logs. From my study I understood that for Consumer end point I need to create own component and configuration would be like

   from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO")

Here is my code snippet which receives message from event system.

 Receive receive = com.tandem.ext.guardian.Receive.getInstance();
    byte[] maxMsg = new byte[500]; // holds largest possible request 
    short errorReturn = 0;
    do { // read messages from $receive until last close 
        try {
            countRead = receive.read(maxMsg, maxMsg.length);
            String receivedMessage=new String(maxMsg, "UTF-8");
            //Here I need to handover receivedMessage to camel 

        } catch (ReceiveNoOpeners ex) {
            moreOpeners = false;
        } catch(Exception e) {
            moreOpeners = false;
        }
    } while (moreOpeners);

Can someone guide with some hints how to make this as a Consumer.

Upvotes: 3

Views: 2696

Answers (2)

vels4j
vels4j

Reputation: 11298

Herewith adding my own consumer component may help someone.

public class MessageConsumer extends DefaultConsumer {

private final MessageEndpoint endpoint;

private boolean moreOpeners = true;

public MessageConsumer(MessageEndpoint endpoint, Processor processor) {
    super(endpoint, processor);
    this.endpoint = endpoint;

}


 @Override
protected void doStart() throws Exception {

    int countRead=0; // number of bytes read 

    do { 
         countRead++;
            String msg = String.valueOf(countRead)+" "+System.currentTimeMillis();
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(msg);
            getAsyncProcessor().process(ex, new AsyncCallback() {
                @Override
                public void done(boolean doneSync) {
                    log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
                }
            });
            // This is an echo server so echo request back to requester    

    } while (moreOpeners);
}

@Override
protected void doStop() throws Exception {
    moreOpeners = false;
    log.debug("Message processor is shutdown");
}

}

Upvotes: 0

Ralf
Ralf

Reputation: 6853

The 10'000 feet view is this:

You need to start out with implementing a component. The easiest way to get started is to extend org.apache.camel.impl.DefaultComponent. The only thing you have to do is override DefaultComponent::createEndpoint(..). Quite obviously what it does is create your endpoint.

So the next thing you need is to implement your endpoint. Extend org.apache.camel.impl.DefaultEndpoint for this. Override at the minimum DefaultEndpoint::createConsumer(Processor) to create your own consumer.

Last but not least you need to implement the consumer. Again, best ist to extend org.apache.camel.impl.DefaultConsumer. The consumer is where your code has to go that generates your messages. Through the constructor you receive a reference to your endpoint. Use the endpoint reference to create a new Exchange, populate it and send it on its way along the route. Something along the lines of

Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
setMyMessageHeaders(ex.getIn(), myMessagemetaData);
setMyMessageBody(ex.getIn(), myMessage);

getAsyncProcessor().process(ex, new AsyncCallback() {
    @Override
    public void done(boolean doneSync) {
        LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
    }
});

I recommend you pick a simple component (DirectComponent ?) as an example to follow.

Upvotes: 7

Related Questions