Quarkus Messaginghub Pooled JMS

Quarkus extension for a JMS Connection pool and transaction manager integration for messaging applications supporting JMS 1.1 and 2.0 clients.

Installation

If you want to use this extension, you need to add the io.quarkiverse.messaginghub:quarkus-pooled-jms extension first. In your pom.xml file, add:

<dependency>
    <groupId>io.quarkiverse.messaginghub</groupId>
    <artifactId>quarkus-pooled-jms</artifactId>
    <version>2.3.1</version>
</dependency>

Pooling support

It is enabled by default. And we test for quarkus-artemis and quarkus-qpid-jms

clientID and durableSubscriptionName are not supported in pooling connections. If setClientID is called on a reused connection from the pool, an IllegalStateException will be thrown. You will get some error messages such like Cause: setClientID can only be called directly after the connection is created

XA transaction support

It needs to add quarkus-narayana-jta extension:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-narayana-jta</artifactId>
</dependency>

and add the following configuration to your application.properties:

quarkus.pooled-jms.transaction=xa
quarkus.transaction-manager.enable-recovery=true

XA support is only available with quarkus-artemis-jms.

Custom ConnectionFactory

For those messaging drivers which do not have quarkus extension, such as ibmmq-client. You need to create a custom ConnectionFactory and wrap it by yourself. Here is an example:

@Produces
public ConnectionFactory createXAConnectionFactory(PooledJmsWrapper wrapper) {
    MQXAConnectionFactory mq = new MQXAConnectionFactory();
    try {
        mq.setHostName(ConfigProvider.getConfig().getValue("ibm.mq.host", String.class));
        mq.setPort(ConfigProvider.getConfig().getValue("ibm.mq.port", Integer.class));
        mq.setChannel(ConfigProvider.getConfig().getValue("ibm.mq.channel", String.class));
        mq.setQueueManager(ConfigProvider.getConfig().getValue("ibm.mq.queueManagerName", String.class));
        mq.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        mq.setStringProperty(WMQConstants.USERID,
            ConfigProvider.getConfig().getValue("ibm.mq.user", String.class));
        mq.setStringProperty(WMQConstants.PASSWORD,
            ConfigProvider.getConfig().getValue("ibm.mq.password", String.class));
    } catch (Exception e) {
        throw new RuntimeException("Unable to create new IBM MQ connection factory", e);
    }
    return wrapper.wrapConnectionFactory(mq);
}

If you use ibmmq-client to consume messages and enable XA, you should make sure it is running in a transaction. Otherwise, you will get an error like MQRC_SYNCPOINT_NOT_AVAILABLE. When you are using ibmmq-client and rollback a transaction, there will be a WARN message like:

WARN  [com.arj.ats.jta] (executor-thread-1) ARJUNA016045: attempted rollback of < formatId=131077, gtrid_length=35, bqual_length=36, tx_uid=0:ffffc0a86510:aed3:650915d7:16, node_name=quarkus, branch_uid=0:ffffc0a86510:aed3:650915d7:1f, subordinatenodename=null, eis_name=0 > (com.ibm.mq.jmqi.JmqiXAResource@79786dde) failed with exception code XAException.XAER_NOTA: javax.transaction.xa.XAException: The method 'xa_rollback' has failed with errorCode '-4'.

it may be ignored and can be assumed that MQ has discarded the transaction’s work.

Local JMS Transaction support

It needs to set the following configuration:

quarkus.pooled-jms.transaction=enabled

and if it is running in a transaction, it can mark the session with SESSION_TRANSACTED and commit or rollback the session according to the outcome of the globe transaction. The example like sending message:

@Transactional
protected void send(String body, ConnectionFactory factory, String queueName) {
    try (JMSContext context = factory.createContext()) {
        JMSProducer producer = context.createProducer();
        producer.send(ActiveMQDestination.createQueue(queueName), body);

        log.info("send {}", body);
    }
}

It is different from the XA support. So if there is any issue happening during the session committing, the transaction will not be rollbacked.

Extension Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Whether to enable pooling capabilities for JMS connections.

Environment variable: QUARKUS_POOLED_JMS_POOLING_ENABLED

boolean

true

Whether to enable jakarta.jms.XAConnection support and integrate with jakarta.transaction.TransactionManager If you enable it, you need to include io.quarkus:quarkus-narayana-jta extension.

Environment variable: QUARKUS_POOLED_JMS_TRANSACTION

enabled, xa, disabled

disabled

Determines the maximum number of Connections the pool maintains in a single Connection pool (defaults to one).

Environment variable: QUARKUS_POOLED_JMS_MAX_CONNECTIONS

int

1

The idle timeout (default 30 seconds) controls how long a Connection that hasn’t been or currently isn’t loaned out to any client will remain idle in the Connection pool before it is eligible to be closed and discarded. To disable idle timeouts the value should be set to 0 or a negative number.

Environment variable: QUARKUS_POOLED_JMS_CONNECTION_IDLE_TIMEOUT

int

30

used to establish a periodic check for expired Connections which will close all Connection that have exceeded the set expiration value. This value is set to 0ms by default and only activates if set to a positive non-zero value.

Environment variable: QUARKUS_POOLED_JMS_CONNECTION_CHECK_INTERVAL

long

0

by default the JMS pool will use it’s own generic JMSContext classes to wrap a Connection borrowed from the pool instead of using the JMSContext functionality of the JMS ConnectionFactory that was configured. This generic JMSContext implementation may be limited compared to the Provider version and if that functionality is critical to the application this option can be enabled to force the pool to use the Provider JMSContext implementation. When enabled the JMSContext API is then not part of the Connections that are pooled by this JMS Connection pooling library.

Environment variable: QUARKUS_POOLED_JMS_USE_PROVIDER_JMS_CONTEXT

boolean

false

For each Connection in the pool there can be a configured maximum number of Sessions that the pooled Connection will loan out before either blocking or throwing an error (based on configuration). By default this value is 500 meaning that each provider Connection is limited to 500 sessions, this limit can be disabled by setting the value to a negative number.

Environment variable: QUARKUS_POOLED_JMS_MAX_SESSIONS_PER_CONNECTION

int

500

When true (default) a call to createSession on a Connection from the pool will block until another previously created and loaned out session is closed an thereby becomes available. When false a call to createSession when no Session is available will throw an IllegalStateException to indicate that the Connection is not able to provide a new Session at that time.

Environment variable: QUARKUS_POOLED_JMS_BLOCK_IF_SESSION_POOL_IS_FULL

boolean

true

When the blockIfSessionPoolIsFull option is enabled and this value is set then a call to createSession that has blocked awaiting a Session will wait for the specified number of milliseconds before throwing an IllegalStateException. By default this value is set to -1 indicating that the createSession call should block forever if configured to wait.

Environment variable: QUARKUS_POOLED_JMS_BLOCK_IF_SESSION_POOL_IS_FULL_TIMEOUT

int

-1

By default a Session that has been loaned out on a call to createSession will use a single anonymous JMS MessageProducer as the underlying producer for all calls to createProducer. In some rare cases this is not desirable and this feature can be disabled using this option, when disabled every call to createProducer will result in a new MessageProducer instance being created.

Environment variable: QUARKUS_POOLED_JMS_USE_ANONYMOUS_PRODUCERS

boolean

true