Using HTTP with Reactive Messaging

This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to consume and produce HTTP messages.

Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.8.1+

  • GraalVM, Docker or Podman installed if you want to run in native mode.

Architecture

In this guide we will implement a service, namely CostConverter that consumes HTTP messages with costs in multiple currencies and converts each cost to its value in Euro.

To let a user easily try out the service, we will implement an HTTP resource summing up the costs (CostCollector), and a simple web page to add new costs and watch the sum.

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/quarkiverse/quarkus-reactive-messaging-http.git, or download an archive.

The solution is located in the http-quickstart directory.

Creating the Maven Project

First, we need a new project. Create a new project with the following command:

mvn io.quarkus.platform:quarkus-maven-plugin:3.8.2:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-http-quickstart \
    -Dextensions="reactive-messaging-http,reasteasy-reactive-jackson" \
    -DnoCode
cd reactive-messaging-http-quickstart

This command generates a Maven project, importing the Reactive Messaging and HTTP connector extensions.

The Converter

Create the src/main/java/org/acme/reactivehttp/CostConverter.java file, with the following content:

package org.acme.reactivehttp;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Logger log = Logger.getLogger(CostConverter.class);

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    double convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return 0.;
        }
        return conversionRatio * cost.getValue();
    }
}
1 Consume messages from the incoming-costs stream.
2 Dispatch returned values to the outgoing-costs stream.
3 Consume an event with payload of type Cost and produce a double. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object.

Let’s define the Cost class:

package org.acme.reactivehttp;

public class Cost {
    private double value;
    private String currency;

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getCurrency() {
        return currency;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }
}

In the next step, we will create configurations for both streams in the application.properties file.

Configuring the HTTP connector

We need to configure the HTTP connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].<channel-name>.<property}=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • incoming-costs → a source that receives costs

  • outgoing-costs → a sink that receives converted costs

mp.messaging.outgoing.outgoing-costs.connector=quarkus-http

# here we are using a URL pointing to an endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector

# we need to use a different port for tests:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector

# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST


mp.messaging.incoming.incoming-costs.connector=quarkus-http

# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs

# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST

Broadcast to multiple subscribers

In case single channel should deliver message to multiple subscribers (methods annotated with @Incoming) then additional property needs to be set

mp.messaging.incoming.incoming-costs.broadcast=true

This will allow to deliver single message to all subscribers of that channel.

The CostCollector

To illustrate that converting messages and passing them through works, let’s add an endpoint that will receive the outgoing costs and sum them up. This is a usual JAX-RS endpoint.

package org.acme.reactivehttp;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {

    private double sum = 0;

    @POST
    public void consumeCost(String valueAsString) {
        sum += Double.parseDouble(valueAsString);
    }

    @GET
    public double getSum() {
        return sum;
    }

}

The HTML page

To conveniently interact with the application, let’s create a simple web page.

The page will provide a form to add costs, and an info of the current sum of costs. The page periodically updates the sum by requesting the current sum from /cost-collector.

Create the src/main/resources/META-INF/resources/index.html file, with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="CHF">Polish złoty</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Last cost</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    add = function() {
        var value = document.getElementById('value').value;
        var currency = document.getElementById('currency').value;

        var cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
    }

    updateCost = function() {
        fetch('cost-collector').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

Get it running

Run the application using:

./mvnw quarkus:dev

Open http://localhost:8080/index.html in your browser.

Running Native

You can build the native executable with:

./mvnw package -Pnative

Going further

HTTP connector options

All quarkus-http connector options:

# OUTGOING

# The target URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213

# Message payload serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer

# The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero
# Zero by default
mp.messaging.outgoing.<channelName>.maxRetries=3

# Configures the random factor when using back-off with maxRetries > 0. 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.3

# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=1s

#The HTTP method (either `POST` or `PUT`), `POST` by default
mp.messaging.outgoing.<channelName>.method=PUT

#INCOMING
# The HTTP method (either `POST` or `PUT`, `POST` by default
mp.messaging.incoming.<channelName>.method=POST

# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint

# HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.
# 8 by default.
mp.messaging.incoming.<channelName>.buffer-size=13

Cloud Event support

The HTTP connector supports binary-mode [cloud event] messages through Metadata.

If incoming HTTP request contains headers of the form ce- or ce_, a CloudEventMetadata instance is included in the Message metadata

If outgoing Message metadata includes a CloudEventMetadata instance, the information contained there will be added as headers prefixed with ce- in the outgoing HTTP request.

Reactive Messaging

This extension utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.