Quarkus RabbitMQ Client

RabbitMQ is a popular message broker. This Quarkus extension provides a client for RabbitMQ which is configurable using the application.properties.

Installation

If you want to use this extension, you need to add the quarkus-rabbitmq-client extension first. In your pom.xml file, add:

<dependency>
    <groupId>io.quarkiverse</groupId>
    <artifactId>quarkus-rabbitmq-client</artifactId>
    <version>0.1.0</version>
</dependency>

Usage

Assuming you have RabbitMQ running on localhost:5672 you should add the following properties to your application.properties and fill in the values for <username> and <password>.

quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=<username>
quarkus.rabbitmqclient.password=<password>
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672

Once you have configured the properties, you can start using the RabbitMQ client.

@ApplicationScoped
public class MessageService {

    private static final Logger log = LoggerFactory.getLogger(MessageService.class);

    @Inject
    RabbitMQClient rabbitMQClient;

    private Channel channel;

    public void onApplicationStart(@Observes StartupEvent event) {
        // on application start prepare the queus and message listener
        setupQueues();
        setupReceiving();
    }

    private void setupQueues() {
        try {
            // create a connection
            Connection connection = rabbitMQClient.connect();
            // create a channel
            channel = connection.createChannel();
            // declare exchanges and queues
            channel.exchangeDeclare("sample", BuiltinExchangeType.TOPIC, true);
            channel.queueDeclare("sample.queue", true, false, false, null);
            channel.queueBind("sample.queue", "test", "#");
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void setupReceiving() {
        try {
            // register a consumer for messages
            channel.basicConsume("sample.queue", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // just print the received message.
                    log.info("Received: " + new String(body, StandardCharsets.UTF_8));
                }
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void send(String message) {
        try {
            // send a message to the exchange
            channel.basicPublish("test", "#", null, message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

You do not need to worry about closing connections as the RabbitMQClient will close them for you on application shutdown.

Extension Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

boolean

true

URI for connecting, formatted as amqp://userName:password@hostName:portNumber/virtualHost

string

Username for authentication

string

guest

Password for authentication

string

guest

Hostname for connecting

string

localhost

string

/

Port number for connecting

int

-1

Connection timeout in milliseconds

int

60000

Heartbeat interval in seconds

int

60

Handshake timeout in milliseconds

int

10000

Shutdown timeout in milliseconds

int

10000

Maximum number of channels per connection

int

2047

int

0

Network recovery interval in milliseconds

int

5000

Channel RPC timeout in milliseconds

int

600000

Validate channel RPC response type

boolean

false

Recover connection on failure

boolean

true

Recover topology on failure

boolean

true

Map<String,String>

Broker addresses for creating connections

Type

Default

string

required

int

0

Tls configuration

Type

Default

boolean

false

string

TLSv1.2

string

string

JKS

string

SunX509

string

string

string

string

PKCS12

string

SunX509

boolean

true

boolean

true

Non blocking IO configuration

Type

Default

Enables non blocking IO

boolean

false

int

32768

int

32768

Number of non blocking IO threads

int

1

Write enqueuing timeout in milliseconds

int

10000

int

10000