Bill Landers
Bill Landers

Reputation: 111

Akka/Camel UntypedConsumerActor not consuming from file-based queue

I'm trying to put together my first Akka/Camel application from "scratch" (read, "noob") using the following lib versions:

According to all of the documentation I can find (Akka docs, user groups, etc.) all I have to do to consume from a file-based queue is set up my system this way:

Main class:

actorSystem = ActorSystem.create("my-system");

Props props = new Props(Supervisor.class);
ActorRef supervisor = actorSystem.actorOf(props, "supervisor");

Camel camel = CamelExtension.get(actorSystem);
CamelContext camelContext = camel.context();
camelContext.start();

Supervisor class:

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.camel.javaapi.UntypedConsumerActor;
import org.apache.camel.Message;

/**
 * Manages creation and supervision of UploadBatchWorkers.
 */
public class Supervisor extends UntypedConsumerActor {    
    @Override
    public String getEndpointUri() {
        return "file:///Users/myhome/queue";
    }

    @Override
    public void preStart() {
        String test = "test";
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof CamelMessage) {
            // do something
        }
    }

My problem is that even though I know the supervisor object is being created and breaks during debugging on the preStart() method's "test" line (not to mention that if I explicitly "tell" it something it processes fine), it does not consume from the defined endpoint, even though I have another application producing messages to the same endpoint.

Any idea what I'm doing wrong?

Upvotes: 1

Views: 412

Answers (2)

Bill Landers
Bill Landers

Reputation: 111

Ok, the problem was my own fault and is clearly visible in the example code if you look at the Consumer trait from which the UntypedConsumerActor inherits.

This method:

@Override
public void preStart() {
    String test = "test";
}

overrides its parent's preStart() method, right? Well, that parent method is actually the one that registers the consumer with the on-the-fly created endpoint, so while you can override it, you must call super() or it will not work.

Hope this is useful to someone down the road!

Upvotes: 2

cmbaxter
cmbaxter

Reputation: 35453

Try changing your instanceof inside of onReceive to this:

if (message instanceof CamelMessage){
  //do processing here
}

Where CamelMessage is from package akka.camel. That's what the examples in the akka camel docs are doing.

Upvotes: 0

Related Questions