Using WebSockets with Reactive Messaging

This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to consume and produce messages via WebSockets. WebSockets support is a part of the Reactive Messaging HTTP extension (quarkus-reactive-messaging-http).

Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.8.1+

  • GraalVM, Docker or Podman installed if you want to run in native mode.

Architecture

In this guide we will implement a service, namely CostConverter that consumes messages with costs in multiple currencies and converts each cost to its value in Euro.

To let a user easily try out the service, we will implement an HTTP resource summing up the costs (CostCollector), and a simple web page to add new costs and watch the sum.

Architecture

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/quarkiverse/quarkus-reactive-messaging-http.git, or download an archive.

The solution is located in the websockets-quickstart directory.

Creating the Maven Project

First, we need a new project. Create a new project with the following command:

mvn io.quarkus:quarkus-maven-plugin:3.8.2:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-websockets-quickstart \
    -Dextensions="reactive-messaging-http,resteasy-reactive-jackson" \
    -DnoCode
cd reactive-messaging-websockets-quickstart

This command generates a Maven project with Reactive Messaging HTTP connector and RESTEasy Reactive with Jackson support extensions.

The Converter

Create the src/main/java/org/acme/reactivews/CostConverter.java file, with the following content:

package org.acme.reactivews;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.0);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    Cost convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return cost;
        }
        return new Cost(conversionRatio * cost.getValue(), "EUR");
    }
}
1 Consume messages from the incoming-costs stream.
2 Dispatch returned values to the outgoing-costs stream.
3 Consume an event with payload of type Cost and produce another Cost, with value converted to Euro if we know the conversion ratio. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object.
Unlike the outbound HTTP connector, the outbound WebSocket connector does not check if the message is consumed by the remote endpoint. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the WebSocket connection in a crucial moment.

Let’s define the Cost class:

package org.acme.reactivews;

public class Cost {
    private double value;
    private String currency;

    public Cost(double value, String currency) {
        this.value = value;
        this.currency = currency;
    }

    public Cost() {
    }

    public void setValue(double value) {
        this.value = value;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }

    public double getValue() {
        return value;
    }

    public String getCurrency() {
        return currency;
    }
}

In the next step, we will create configurations for both streams in the application.properties file.

Configuring the Web Socket connector

We need to configure the Web Socket connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.{property}=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • incoming-costs → an inbound WebSocket that receives costs

  • outgoing-costs → an outbound WebSocket that pushes converted costs

mp.messaging.outgoing.outgoing-costs.connector=quarkus-websocket

# the WebSockets are exposed on the same port as HTTP
# for non-test profiles, it is quarkus.http.port...
mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.port}/cost-collector

# for the test profile it is quarkus.http.test-port
%test.mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.test-port}/cost-collector


mp.messaging.incoming.incoming-costs.connector=quarkus-websocket
# the incoming-costs channel will be fed via a Web Socket endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs

mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket

The CostCollector

To illustrate that converting messages and passing them through works, let’s add a CostCollector class that consumes the Web Socket messages and exposes information about the sum of collected costs via REST:

package org.acme.reactivews;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

@Path("/collected-costs")
@ApplicationScoped
public class CostCollector {
    private double sum;

    @GET
    // expose the sum of the collected costs
    public synchronized double getCosts() {
        return sum;
    }

    @Incoming("collector")
    // consume costs from collector channel
    synchronized void collect(Cost cost) {
        if ("EUR".equals(cost.getCurrency())) {
            sum += cost.getValue();
        }
    }
}

One more thing we have to do is to configure the collector channel in application.properties:

mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket

The HTML page

To conveniently interact with the application, let’s create a simple web page.

The page will provide a form to add costs, and an info of the current sum of costs. The page periodically updates the sum by requesting the current sum from /cost-collector.

Change the src/main/resources/META-INF/resources/index.html file, with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="PLN">Polish złoty</option>
                <option value="EUR">Euro</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Summary</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    var webSocket = new WebSocket(`ws://${document.location.host}/costs`);
    add = function() {
        const cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        webSocket.send(JSON.stringify(cost));
    }

    updateCost = function() {
        fetch('collected-costs').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

Get it running

Run the application using:

./mvnw quarkus:dev

Open http://localhost:8080/index.html in your browser.

Running Native

You can build the native executable with:

./mvnw package -Pnative

Going further

WebSockets

All quarkus-websocket connector options:

# OUTGOING

# The target URL
mp.messaging.outgoing.<channelName>.url=ws://localhost:8234/

# Message serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer

# The number of retries to make for sending a message to a remote websocket endpoint.
# A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message
# The default value is 1
mp.messaging.outgoing.<channelName>.maxRetries=1

# Configures the random factor when using back-off with maxAttempts > 1, 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.7

# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=2s


# INCOMING

# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-ws-endpoint

# Web socket endpoint buffers messages if a consumer is not able to keep up.
# This setting specifies the size of the buffer. 8 by default
mp.messaging.incoming.<channelName>.buffer-size=3

Reactive Messaging

This extension utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.