Jithin S
Jithin S

Reputation: 95

Is there any function in Quarkus to send message to Kafka

I'm new to kafka and quarkus, i want to send message to kafka topic when a user request has processed.

I have gone through kafka example provided in quarkus-quickstart. I have tried with KafkaMessage

// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
    generateSingle();
    return "hello";
}

@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
    return KafkaMessage.of(1, "value");
};

But i got a result that sending Message to kafka topic continously.

I want to know is there any other method or is there any problem with my code.

Help appreciated

Upvotes: 2

Views: 3219

Answers (2)

Marc
Marc

Reputation: 1629

It's been a while since this was asked but at the moment this can be achieved using the following:

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Emitter;

public class ExampleProducer {

    @Inject
    @Channel("single-stations")
    Emitter<Record<Integer, String>> stationEmitter;


    // when GET called send message to topic
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        var message = generateSingle();
        stationEmitter.send(message);
        return "hello";
    }
    
    public Record<Integer, String> generateSingle() {
        return Record.of(1, "value");
    };
}

The Emitter object allows you to send messages on a non continuous manner.

But a lot more information is available here https://quarkus.io/guides/kafka#sending-messages-to-kafka

Upvotes: -1

Nikos Paraskevopoulos
Nikos Paraskevopoulos

Reputation: 40318

The documentation on this topic is terse and incomplete at this time (Quarkus 0.25.0). I managed to do it, but it took a lot of experimentation and something I believe is a hack that hopefully will be remedied in later versions of Quarkus.

The principle is that the @Outgoing method must produce a stream that is controlled externally. This is accomplished by creating the stream through Flowable.create() in a @PostConstruct method, and exposing the emitter to a class member. The @Outgoing method simply returns the already constructed stream.

The following component exposes one public method, produce(String message) that will send that text message to Kafka:

package ...

import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;

@ApplicationScoped
public class KafkaController {

    private FlowableEmitter<KafkaMessage<String, String>> emitter;

    private Flowable<KafkaMessage<String, String>> outgoingStream;

    @PostConstruct
    void init() {
        outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
    }

    public void produce(String message) {
        emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
    }

    @PreDestroy
    void dispose() {
        emitter.onComplete();
    }

    @Outgoing("internal")
    Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
        return outgoingStream;
    }

    @Incoming("internal")
    @Outgoing("kafka-test")
    KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
        return arg.getPayload();
    }
}

I created this class in a generated Quarkus application, as documented here:

mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"

And configured (application.properties) as follows:

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer

A Kafka instance is started exactly as described in the quickstart. You can watch the test topic with a console listener as follows:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test --from-beginning --group test-console.consumer

To test it, you can create a JAX-RS resource to invoke produce():

package ...

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/control")
public class KafkaProduceControlResource {

    @Inject
    KafkaController kafkaController;

    @POST
    @Path("/produce")
    public void produceMessage(String message) {
        kafkaController.produce(message);
    }
}

Invoke it from command line as follows and watch the console consumer:

curl -i -s -X POST -d "A text message" \
    http://localhost:8080/control/produce

THE HACK: It seems that annotating produceKafkaMessage() with @Outgoing("kafka-test") fails, because Quarkus does NOT understand that a KafkaMessage is a Message, and is wrapping it in one, resulting in serialization errors. I am bypassing this with the "internal" stream.

Upvotes: 3

Related Questions