Quarkus RabbitMQ Client
RabbitMQ is a popular message broker. This Quarkus extension provides a client for RabbitMQ which is configurable using the application.properties
.
Installation
If you want to use this extension, you need to add the quarkus-rabbitmq-client
extension first.
In your pom.xml
file, add:
<dependency>
<groupId>io.quarkiverse</groupId>
<artifactId>quarkus-rabbitmq-client</artifactId>
<version>0.1.0</version>
</dependency>
Usage
Assuming you have RabbitMQ running on localhost:5672 you should add the following properties to your application.properties
and fill in the values for <username>
and <password>
.
quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=<username>
quarkus.rabbitmqclient.password=<password>
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672
Once you have configured the properties, you can start using the RabbitMQ client.
@ApplicationScoped
public class MessageService {
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
@Inject
RabbitMQClient rabbitMQClient;
private Channel channel;
public void onApplicationStart(@Observes StartupEvent event) {
// on application start prepare the queus and message listener
setupQueues();
setupReceiving();
}
private void setupQueues() {
try {
// create a connection
Connection connection = rabbitMQClient.connect();
// create a channel
channel = connection.createChannel();
// declare exchanges and queues
channel.exchangeDeclare("sample", BuiltinExchangeType.TOPIC, true);
channel.queueDeclare("sample.queue", true, false, false, null);
channel.queueBind("sample.queue", "test", "#");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void setupReceiving() {
try {
// register a consumer for messages
channel.basicConsume("sample.queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// just print the received message.
log.info("Received: " + new String(body, StandardCharsets.UTF_8));
}
});
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void send(String message) {
try {
// send a message to the exchange
channel.basicPublish("test", "#", null, message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
You do not need to worry about closing connections as the RabbitMQClient
will close them for you on application shutdown.
Extension Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
Enables health check |
boolean |
|
URI for connecting, formatted as amqp://userName:password@hostName:portNumber/virtualHost |
string |
|
Username for authentication |
string |
|
Password for authentication |
string |
|
Hostname for connecting |
string |
|
Virtual host |
string |
|
Port number for connecting |
int |
|
Connection timeout in milliseconds |
int |
|
Heartbeat interval in seconds |
int |
|
Handshake timeout in milliseconds |
int |
|
Shutdown timeout in milliseconds |
int |
|
Maximum number of channels per connection |
int |
|
Maximum frame size |
int |
|
Network recovery interval in milliseconds |
int |
|
Channel RPC timeout in milliseconds |
int |
|
Validate channel RPC response type |
boolean |
|
Recover connection on failure |
boolean |
|
Recover topology on failure |
boolean |
|
Client properties |
|
|
Type |
Default |
|
Hostname for connecting |
string |
required |
Port number for connecting |
int |
|
Type |
Default |
|
Enables TLS |
boolean |
|
TLS Algorithm to use |
string |
|
Trust store file |
string |
|
Trust store type |
string |
|
Trust store algorithm |
string |
|
Trust store password |
string |
|
Key store file |
string |
|
Key store password |
string |
|
Key store type |
string |
|
Key store algorithm |
string |
|
Validate server certificate |
boolean |
|
Verify hostname |
boolean |
|
Type |
Default |
|
Enables non blocking IO |
boolean |
|
Read buffer size in bytes |
int |
|
Write buffer size in bytes |
int |
|
Number of non blocking IO threads |
int |
|
Write enqueuing timeout in milliseconds |
int |
|
Write queue capacity. |
int |
|