John Boudreaux
John Boudreaux

Reputation: 105

Reactive Messaging: Emit events when needed (using Kafka)

I am just learning Quarkus and Reactive Messaging. I am trying to message between two components. The examples I have found have demonstrated streams that have a known data set that is streamed or are repeating a payload continuously. (Such as from the Kafka Quickstart which continually streams a new random number as the price)

I need to put an event on the stream only when certain events occur in the business logic. Are there any examples?

I did find this post on StackOverflow, Is there any function in Quarkus to send message to Kafka. However, there are two problems:

I cannot get this form to work.

  1. The emitter is always null.
  2. I am attempting to do this purely using Reactive Messaging without bleeding through Kafka from the background

UPDATE: @iabughosh

Thank you. But I am still getting an Emitter that is null injected. Here are the relevant code snippets: mp.messaging.outgoing.ownercreated.connector=smallrye-kafka mp.messaging.outgoing.ownercreated.topic=ownercreated mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

`public class Owner  {

    @Inject
    @Channel("ownercreated")
    private static Emitter<Owner> ownerCreatedChannel;

    public void persist() {
        Owner.ownerCreatedChannel.send(this);
    }
}`

I've injected as instance var, too.

UPDATE #2 at request of @iabughosh - Thank you for your help!

package org.boosey;

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class Owner {
    private final Logger logger = Logger.getLogger(Owner.class.getName());

    @Inject
    @Channel("ownercreated")
    private Emitter<Owner> ownerCreatedChannel;

    public String name;
    public String email;

    public void persist() {
        logger.info("IN PERSIST");

        ownerCreatedChannel.send(this);

        logger.info("SENT NEW OWNER");
    }
}

application.properties:

mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

The Owner.persist method is being called from a Quarkus REST Resource class. I have verified that a properly instantiated Owner object is received in Owner.persist.

@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {

    @POST
    public Response create(Owner owner) {
        owner.persist();
        return Response.status(201).build();
    }
}

Upvotes: 2

Views: 2097

Answers (1)

iabughosh
iabughosh

Reputation: 551

If you have an outgoing topic configured correctly in application.properties file, all you need to do is to inject Emitter like this :

@Inject
@Channel("your-channel")
Emitter<String> outgoingChannel;

and in your function you can call :

outgoingChannel.send(msg);

where your-channel will look like this in config file :

mp.messaging.outgoing.your-channel.topic=kafka-topic

Update : Move Emitter (Along with annotations) code to OwnerResource, it should work smoothly. Also you can remove @ApplicationScoped from Owner if you moved that code. What happening here is Owner object is not created by CDI, that's why it is not injecting any other objects. Regards.

Upvotes: 4

Related Questions