Consuming kafka messages in a Quarkus application in a reactive way

Julio Santana
3 min readFeb 5, 2023

--

I’ve recently discovered Quarkus framework and coming from using Spring for many years, there are many features that I like and didn’t enjoy before.

Queue with a considerable lag ;)

Appart from its usual selling line “Is a framework designed for the cloud” given the fact that it does many tasks like dependency injection and property loading at compile time in order to start faster and consume less memory, it has also great support for containerization, a lovely developer mode, and last but not least, it provide out of the box support for reactive programming.

The idea behind this model can sound familiar if you have used Node before, where there is a limited number of processing threads that need to do all the work, as opposed from Java traditional models where new “heavy and expensive” threads where created when they were needed. For this purpose quarkus relies on Mutiny framework which helps when declaring reactive flows, and has as a philosophy to try and don’t create too many threads to process things, like for example http requests that are mostly processed in the IO thread receiving the request as opposed to usual java web server that rely on the so called worked threads to do this job

To illustrate this, I created the following example of how to consume messages coming from Kafka using Quarkus.

A good starting point could be the .properties file where we need to define the binding between the kafka topic storing our messages and the Channel which is the object that uses quarkus to connect to a topic.

mp.messaging.incoming.products.connector=smallrye-kafka
mp.messaging.incoming.products.topic=product-topic
mp.messaging.incoming.products.failure-strategy=dead-letter-queue
mp.messaging.incoming.products.dead-letter-queue.topic=product-topic-dlq

In the example, we declare that we will use smallrye-kafka connector which is the usual (although some others like in-memory exists for testing). Then in the second line we declare that we want to connect the channel products to topic product-topic and that it will be an incoming channel, to receive messages in this case. Later we declare our failure strategy to be dead letter queue and also define which queue will accumulate the dead letter messages.

So is time to write some Java code then.

@Incoming("products")                                              
public Uni<Void> consumeProduct(Message<String> message) {
return Uni.createFrom().item(message.getPayload())
.onItem()
.invoke(product -> processItem(product))
.invoke(() -> Uni.createFrom().completionStage(message::ack))
.onFailure().call(throwable ->
Uni.createFrom().completionStage(() -> record.nack(throwable)))
.onFailure().recoverWithNull();
}

void processItem(String product) {
if(product.contains("!")) {
throw new RuntimeException("Illegal Character");
}

System.out.println("Processed " + product);
}

We declare that we will be listening to products channel using @Incoming annotation. Then we choose to receive a Message which is an object wrapping the Kafka message with extra data and actions like for example the possibility to ack and nack the message. In fact if you choose Message Quarkus give the responbility of manually saying when a message was completely processed or when there was a failure by using these methods. Later we declare our Uni to declaratively specify what will our flow do. In this case we will process the parameter and process it. Then we will acknowledge the message in the happy case, and when there is some error we will Nack it, but after doing it we must mark the Uni as recovered. This is needed for quarkus health check to consider the channel healthy and keep sending new messages after the failed one is committed and sent to the DLQ.

And this is it. Undoubtedly is a different way of thinking compared to the usual imperative processing that is used in Spring to process messages, but it provides other benefit like keep the thread unblocked and allow application to handle more request with less resources when used in a good way. Happy coding!

--

--