Reputation: 105
I am just learning Quarkus and Reactive Messaging. I am trying to message between two components. The examples I have found have demonstrated streams that have a known data set that is streamed or are repeating a payload continuously. (Such as from the Kafka Quickstart which continually streams a new random number as the price)
I need to put an event on the stream only when certain events occur in the business logic. Are there any examples?
I did find this post on StackOverflow, Is there any function in Quarkus to send message to Kafka. However, there are two problems:
I cannot get this form to work.
UPDATE: @iabughosh
Thank you. But I am still getting an Emitter that is null injected. Here are the relevant code snippets:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
`public class Owner {
@Inject
@Channel("ownercreated")
private static Emitter<Owner> ownerCreatedChannel;
public void persist() {
Owner.ownerCreatedChannel.send(this);
}
}`
I've injected as instance var, too.
UPDATE #2 at request of @iabughosh - Thank you for your help!
package org.boosey;
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class Owner {
private final Logger logger = Logger.getLogger(Owner.class.getName());
@Inject
@Channel("ownercreated")
private Emitter<Owner> ownerCreatedChannel;
public String name;
public String email;
public void persist() {
logger.info("IN PERSIST");
ownerCreatedChannel.send(this);
logger.info("SENT NEW OWNER");
}
}
application.properties:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
The Owner.persist method is being called from a Quarkus REST Resource class. I have verified that a properly instantiated Owner object is received in Owner.persist.
@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {
@POST
public Response create(Owner owner) {
owner.persist();
return Response.status(201).build();
}
}
Upvotes: 2
Views: 2097
Reputation: 551
If you have an outgoing
topic configured correctly in application.properties file, all you need to do is to inject Emitter like this :
@Inject
@Channel("your-channel")
Emitter<String> outgoingChannel;
and in your function you can call :
outgoingChannel.send(msg);
where your-channel will look like this in config file :
mp.messaging.outgoing.your-channel.topic=kafka-topic
Update : Move Emitter (Along with annotations) code to OwnerResource, it should work smoothly. Also you can remove @ApplicationScoped from Owner if you moved that code. What happening here is Owner object is not created by CDI, that's why it is not injecting any other objects. Regards.
Upvotes: 4