Reputation: 602
I am implementing JGroups with AxonFramework and I am referring to this link. I made some changes in the code and running the project without Docker. Following is my code -
Main Class -
public class ClusterRunner {
public static void main(String[] args) {
Thread t1 = new Thread(new PrimaryNode());
Thread t2 = new Thread(new SecondaryNode());
t1.start();
t2.start();
}
}
Primary Node -
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class PrimaryNode implements Runnable {
private JGroupsConnector connector;
private CommandGateway commandGateway;
private EventStore eventStore;
private CommandBus commandBus;
public PrimaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
for (int a = 0; a < 5; a++) {
System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
}
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
Seconday Node -
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class SecondaryNode implements Runnable {
private JGroupsConnector connector;
private EventStore eventStore;
public SecondaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
CommandBus commandBus = null;
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
@SuppressWarnings("unused")
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
System.out.println("Secondary Node -- " + event.getPayload());
}), eventStore).start();
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
Item -
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
class CreateItem {
@TargetAggregateIdentifier
private final String itemId;
private final String name;
public CreateItem(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
}
class ItemCreated {
private final String itemId;
private final String name;
public ItemCreated(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
@Override
public String toString() {
return itemId + " " + name;
}
}
class Item {
@AggregateIdentifier
private String itemId;
private String name;
public Item() {
}
@CommandHandler
public Item(CreateItem createItem) {
apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
}
@EventHandler
public void itemCreated(ItemCreated itemCreated) {
itemId = itemCreated.getItemId();
name = itemCreated.getName();
}
}
Now my problem is, when I run main class, primary node produces 5 events but the secondary node is not getting all the events. It may get 2 or 3 or 4 events but not all. I want all of the events to be delivered to the secondary node. I am very new to AxonFramework and JGroups. Please help me understand what is the problem here.
Upvotes: 0
Views: 448
Reputation: 2910
By default, Axon will subscribe each of your Event Handlers to the Event Bus (in your case the EmbeddedEventStore). That means a handler is invoked when that specific local instance publishes an Event. And that Event is published when handling a Command. So essentially, the Event Handlers are invoked on the Node that handles the command.
Alternatively, you can configure your event handlers to run in "tracking" mode. In that case, they would open a connection to the Event Store. In that case, depending on the exact configuration, each node can pick up its own copy of the event, regardless of where it was published.
Upvotes: 1
Reputation: 602
So after trying everything, I decided to experiment with routing strategy. I decided to use AbstractRoutingStrategy which basically helps in decision making for the command messages which don't have a decisive destination. Following is the working code that is in the Primary node (sender) of JGroup. Modify the configureDistributedCommandBus() method from PrimaryNode class as -
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {
@Override
protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {
View view = channel.getView();
if (view.getMembers().size() == 2) {
return "secondary";
} else if (view.getMembers().size() == 1) {
}
return cmdMsg.getIdentifier();
}
};
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
Since I am using JGroups, I can get the view of cluster, i.e. how many nodes are there. On that basis I will take decision of the command message routing.
Upvotes: 1