Quarkus Zeebe

Quarkus Zeebe extension. Zeebe’s cloud-native design provides the performance, resilience, and security enterprises need to future-proof their process orchestration efforts.

Installation

If you want to use this extension, you need to add the io.quarkiverse.zeebe:quarkus-zeebe extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

<dependency>
    <groupId>io.quarkiverse.zeebe</groupId>
    <artifactId>quarkus-zeebe</artifactId>
    <version>0.8.0</version>
</dependency>

Upgrade

In version >=0.8.0 we replaced @ZeebeWorker with @JobWorker annotation.
In version >=0.7.0 we removed the hazelcast dependency and zeebe-simple-monitor for test and dev services. Now we do use zeebe-test-container with debug exporter and zeebe-dev-monitor. In test module we remove our assert API and switch to Camunda BpmnAssert from zeebe-process-test. Test api migration: io.quarkiverse.zeebe.test.BpmnAssertio.camunda.zeebe.process.test.assertions.BpmnAssert

Configuration

Exemplary Setup for your local development

Generally speaking there are three ways to configure your quarkus project to speak with camunda: - Local dev instance with dev services - Shared local dev instance - Direct interaction with Camunda SaaS/ on-premise

You can see some exemplary configurations for each of the setups below. Please note that these are only exemplary and can be adapted to your needs.

Local dev instance with dev services

# enable auto load bpmn resources
quarkus.zeebe.resources.enabled=true
# src/main/resources/bpmn
quarkus.zeebe.resources.location=bpmn
# Enable zeebe Dev Service:
quarkus.zeebe.devservices.enabled=true
# only start devservices, if no running docker container is found
quarkus.zeebe.devservices.shared=true
# zeebe service name
quarkus.zeebe.devservices.service-name=zeebe_broker
# enable reusable zeebe test-container (https://www.testcontainers.org/features/reuse/)
quarkus.zeebe.devservices.reuse=false
# enable zeebe monitor Dev Service:
quarkus.zeebe.devservices.monitor.enabled=true
# zeebe monitor service name
quarkus.zeebe.devservices.monitor.service-name=zeebe-dev-monitor
# enable reusable zeebe test-container (https://www.testcontainers.org/features/reuse/)
quarkus.zeebe.devservices.monitor.reuse=false
# enable hot restart for bpmn subdirectories changes
quarkus.zeebe.dev-mode.watch-bpmn-dir=true
# enable hot restart for bpmn files changes
quarkus.zeebe.dev-mode.watch-bpmn-files=true
# enable hot restart for job worker changes
quarkus.zeebe.dev-mode.watch-job-worker=true

Shared local dev instance

quarkus.zeebe.client.broker.gateway-address=localhost:26500
# If you are sure that there is already an instance running, yu can directly deactivate it
quarkus.zeebe.devservices.enabled=false
quarkus.zeebe.devservices.shared=true
quarkus.zeebe.devservices.monitor.serviceName=zeebe-dev-monitor
quarkus.zeebe.devservices.serviceName=zeebe_broker

Direct interaction with Camunda Cloud live instance

Preferably you would be using a dev instance of Camunda and not your production process engine ;)

# Disable local dev services
quarkus.zeebe.devservices.enabled=false

# Enter your cloud credentials from the zeebe portal
quarkus.zeebe.client.broker.gateway-address=
# Specify a cluster id.
quarkus.zeebe.client.cloud.cluster-id=
# Specify a client id.
quarkus.zeebe.client.cloud.client-id=
# Specify a client secret to request an access token.
quarkus.zeebe.client.cloud.client-secret=
# Specify a cloud region.
quarkus.zeebe.client.cloud.region=
quarkus.zeebe.client.cloud.base-url=zeebe.camunda.io
quarkus.zeebe.client.cloud.auth-url=https://login.cloud.camunda.io/oauth/token
quarkus.zeebe.client.cloud.port=443

# Make sure you are disabling plaintext security, otherwise connection will fail
quarkus.zeebe.client.security.plaintext=false

Metrics

Whether zeebe metrics is enabled or not is done by quarkus.zeebe.metrics.enabled build time property. The default is true, but shown here to indicate how it can be disabled.

quarkus.zeebe.metrics.enabled=true

You can access the following metrics:

  • camunda.job.invocations: Number of invocations of job workers (tagging the job type)

  • activated: The job was activated and started to process an item

  • completed: The processing was completed successfully

  • failed: The processing failed with some exception

  • bpmn-error: The processing completed by throwing an ZeebeBpmnError (which means there was no technical problem)

Example:

# TYPE camunda_job_invocations counter
# HELP camunda_job_invocations
camunda_job_invocations_total{action="activated",type="gateway-empty-data"} 1.0
camunda_job_invocations_total{action="completed",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="activated",type="gateway-show-data"} 2.0
camunda_job_invocations_total{action="completed",type="gateway-read-data"} 2.0

Micrometer

If you already have your Quarkus project configured, you can add the quarkus-micrometer-registry-prometheus extension to your project.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-micrometer-registry-prometheus</artifactId>
</dependency>

Smallrye Metrics (Microprofile)

If you already have your Quarkus project configured, you can add the quarkus-smallrye-metrics extension to your project.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-metrics</artifactId>
</dependency>

Tracing

Whether zeebe tracing is enabled or not is done by quarkus.zeebe.tracing.enabled build time property. The default is true, but shown here to indicate how it can be disabled.

quarkus.zeebe.tracing.enabled=true
OpenTelemetry

OpenTelemetry

If you already have your Quarkus project configured, you can add the quarkus-opentelemetry-exporter-otlp extension to your project.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-opentelemetry-exporter-otlp</artifactId>
</dependency>

OpenTracing

If you already have your Quarkus project configured, you can add the smallrye-opentracing extension to your project.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-opentracing</artifactId>
</dependency>

Dev-Services

Dev Services for Zeebe is automatically enabled unless: * quarkus.zeebe.devservices.enabled is set to false * quarkus.zeebe.broker.gateway-address is configured

Dev Service for Zeebe relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using quarkus.zeebe.broker.gateway-address.

Test

To activate Zeebe-Dev-Monitor Dev Service use this configuration:

quarkus.zeebe.devservices.enabled=true
quarkus.zeebe.devservices.monitor.enabled=true

Property qquarkus.zeebe.devservices.monitor.enabled=true will activate the debug exporter.

Usage

@JobWorker

You need to configure the job type via the JobWorker annotation:

@JobWorker(type = "my-job")
public void executeMyJob() {
    // handles jobs of type 'my-job'
}

If you don’t specify the type the method name is use as default:

@JobWorker
public void test() {
        // handles jobs of type 'test'
}

Or you can set a default job type:

quarkus.zeebe.client.job.default-type=test

Variables

You can specify that you only want to fetch some variables or when executing a job, which can decrease load and improve performance:

@JobWorker(type="test", fetchVariables={"var1", "var2"})
public void test(final JobClient client, final ActivatedJob job) {
    String var1 = (String) job.getVariablesAsMap().get("var1");
    System.out.println(var1);
    // ...
}

@Variable

By using the @Variable annotation there is a shortcut to make variable retrieval simpler, including the type cast:

@JobWorker(type="test")
public void test(final JobClient client, final ActivatedJob job, @Variable String var1) {
    System.out.println(var1);
    // ...
}

With @Variable or fetchVariables you limit which variables are loaded from the workflow engine. You can also override this with fetchAllVariables and force that all variables are loaded anyway:

@JobWorker(type="test", fetchAllVariables=true)
public void test(final JobClient client, final ActivatedJob job) {
    String var1 = (String) job.getVariablesAsMap().get("var1");
    System.out.println(var1);
    // ...
}

@VariablesAsType

You can also use your own class into which the process variables are mapped to (comparable with getVariablesAsType() in the Java Client API). Therefore, use the @VariablesAsType annotation. In the below example, Parameter refers to your own class:

@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p) {
    System.out.println(p.getValue());
    p.setValue(1);
    // ... custom code
    return param;
}

Fetch variables via Job

You can access variables of a process via the ActivatedJob object, which is passed into the method if it is a parameter:

@JobWorker(type="test")
public void test(final ActivatedJob job) {
    String var1 = (String) job.getVariablesAsMap().get("var1");
    System.out.println(var1);
    // ...
}

@CustomHeader

You can use the @CustomHeader annotation for a parameter to retrieve custom header for a job:

@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeader String header1, @CustomHeader("custom-header") String header2) {
    System.out.println(header1);
    System.out.println(header2);
    System.out.println(p.getValue());
    p.setValue(1);
    // ... custom code
    return param;
}

@CustomHeaders

You can use the @CustomHeaders annotation for a parameter to retrieve custom headers for a job:

@JobWorker(type = "job1")
public void job1(@VariablesAsType Parameter p, @CustomHeaders Map<String, String> headers) {
    System.out.println(headers.get("header1"));
    System.out.println(headers.get("custom-header"));
    System.out.println(p.getValue());
    p.setValue(1);
    // ... custom code
}

Auto-completing jobs

By default, the autoComplete attribute is set to true for any job worker. In this case, the Quarkus extension will take care about job completion for you:

@JobWorker(type = "job1")
public void job1(final ActivatedJob job) {
    // ... custom code ...
    // no need to call client.newCompleteCommand()...
}

Note that the code within the handler method needs to be synchronously executed, as the completion will be triggered right after the method has finished.

When using autoComplete you can: * Return a Map, String, InputStream, or Object, which then will be added to the process variables * Throw a ZeebeBpmnError which results in a BPMN error being sent to Zeebe * Throw any other Exception that leads in a failure handed over to Zeebe

@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
    // ... custom code ...
    if (ok) {
       return responseMap;
    } else {
       throw new ZeebeBpmnError("Error code", "Error message");
    }
}

Programmatically completing jobs

Your job worker code can also complete the job itself. This gives you more control about when exactly you want to complete the job (e.g. allowing the completion to be moved to reactive callbacks):

@JobWorker(type = "job1")
public void job1(final JobClient client, final ActivatedJob job) {
    // ... custom code ...
    client.newCompleteCommand(job.getKey()).send()
        .exceptionally( throwable -> { throw new RuntimeException("Could not complete job " + job, throwable); });
}

Ideally, you don’t use blocking behavior like send().join(), as this is blocking call to wait for the issues command to be executed on the workflow engine. While this is very straightforward to use and produces easy-to-read code, blocking code is limited in terms of scalability.

That’s why the worker above showed a different pattern (using exceptionally), often you might also want to use the whenComplete callback:

 client.newCompleteCommand(job.getKey()).send()
        .whenComplete((result, exception) -> {});

This registers a callback to be executed if the command on the workflow engine was executed or resulted in an exception. This allows for parallelism. This is discussed in more detail in this blog post about writing good workers for Camunda Cloud.

Note that when completing jobs programmatically, you must specify autoComplete = false. Otherwise, there is a race condition between your programmatic job completion and the Quarkus extension integration job completion, this can lead to unpredictable results.

Throwing ZeebeBpmnError

Whenever your code hits a problem that should lead to a BPMN error being raised, you can simply throw a ZeebeBpmnError providing the error code used in BPMN:

@JobWorker(type = "job1")
public Map<String, Object> job1(final ActivatedJob job) {
    // ... custom code ...
    if (ok) {
       return responseMap;
    } else {
       throw new ZeebeBpmnError("Error code", "Error message");
    }
}

Non-blocking Methods

By default, a scheduled method is executed on the main executor for blocking tasks. As a result, a technology that is designed to run on a Vert.x event loop (such as Hibernate Reactive) cannot be used inside the method body. For this reason, a job worker method that returns java.util.concurrent.CompletionStage<?> or io.smallrye.mutiny.Uni<Void> or is annotated with @io.smallrye.common.annotation.NonBlocking is executed on the Vert.x event loop.

@JobWorker(type = "job1")
public Uni<Void> job1(final ActivatedJob job) {
    // ... custom code ...
    // no need to call client.newCompleteCommand()...
}

The return type Uni<Void> instructs the job worker to execute the method on the Vert.x event loop.

Testing

To use the test extension, add this dependency to the project:

<dependency>
    <groupId>io.quarkiverse.zeebe</groupId>
    <artifactId>quarkus-zeebe-test</artifactId>
    <version>{version}</version>
    <scope>test</scope>
</dependency>
Test

To use the ZeebeClient and BpmnAssert in the tests use the @QuarkusTestResource(ZeebeTestResource.class) and enable this configuration:

quarkus.zeebe.devservices.enabled=true

Test example

import io.quarkiverse.zeebe.test.ZeebeTestResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;

@QuarkusTest
@QuarkusTestResource(ZeebeTestResource.class)
public class BaseTest {

    @InjectZeebeClient
    ZeebeClient client;

    @Test
    public void startProcessTest() {
        ProcessInstanceEvent event = client.newCreateInstanceCommand()
                .bpmnProcessId("test").latestVersion()
                .variables(Map.of("k","v")).send().join();

        ProcessInstanceAssert a = BpmnAssert.assertThat(event);
        await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
    }
}

We can reuse the test for the integration test.

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class BaseIT extends BaseTest {

}

For more information check examples in the integration-tests directory in this repo.

Testing with embedded engine

The Zeebe process test embedded engine requires Java version >= 17
import io.quarkiverse.zeebe.test.ZeebeTestEmbeddedResource;
import io.quarkus.test.common.QuarkusTestResource;
import io.camunda.zeebe.client.ZeebeClient;

@QuarkusTest
@QuarkusTestResource(ZeebeTestEmbeddedResource.class)
public class BaseTest {

    @InjectZeebeClient
    ZeebeClient client;

    @Test
    public void startProcessTest() {
        ProcessInstanceEvent event = client.newCreateInstanceCommand()
                .bpmnProcessId("test").latestVersion()
                .variables(Map.of("k","v")).send().join();

        ProcessInstanceAssert a = BpmnAssert.assertThat(event);
        await().atMost(7, SECONDS).untilAsserted(a::isCompleted);
    }
}

Extension Configuration Reference

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

If DevServices has been explicitly enabled or disabled. DevServices is generally enabled by default, unless there is an existing configuration present. When DevServices is enabled Quarkus will attempt to automatically configure and start a database when running in Dev or Test mode and when Docker is running.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_ENABLED

boolean

true

Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_PORT

int

Indicates if the Zeebe server managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Zeebe starts a new container. The discovery uses the quarkus-dev-service-zeebe label. The value is configured using the service-name property. Container sharing is only used in dev mode.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_SHARED

boolean

true

The value of the quarkus-dev-service-zeebe label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for Zeebe looks for a container with the quarkus-dev-service-zeebe label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-zeebe label set to the specified value. This property is used when you need multiple shared Zeebe servers.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_SERVICE_NAME

string

zeebe

The container image name to use, for container based DevServices providers.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_IMAGE_NAME

string

Helper to define the stop strategy for containers created by DevServices. In particular, we don’t want to actually stop the containers when they have been flagged for reuse, and when the Testcontainers configuration has been explicitly set to allow container reuse. To enable reuse, ass testcontainers.reuse.enable=true in your .testcontainers.properties file, to be stored in your home.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_REUSE

boolean

false

Enable or disable dev monitor for dev-services.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_MONITOR_ENABLED

boolean

false

Optional fixed port the dev monitor will listen to. If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_MONITOR_PORT

int

The container image name to use, for container based zeebe simple monitor.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_MONITOR_IMAGE_NAME

string

ghcr.io/lorislab/zeebe-dev-monitor:8.1

The value of the quarkus-dev-service-zeebe label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for Zeebe looks for a container with the quarkus-dev-service-zeebe label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-zeebe label set to the specified value. This property is used when you need multiple shared Zeebe servers.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_MONITOR_SERVICE_NAME

string

zeebe-dev-monitor

Helper to define the stop strategy for containers created by DevServices. In particular, we don’t want to actually stop the containers when they have been flagged for reuse, and when the Testcontainers configuration has been explicitly set to allow container reuse. To enable reuse, ass testcontainers.reuse.enable=true in your .testcontainers.properties file, to be stored in your home.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_MONITOR_REUSE

boolean

false

Optional fixed debug export receiver port the dev service will listen to. If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_TEST_RECEIVER_PORT

int

Disable or enable debug exporter for the test.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_TEST_EXPORTER

boolean

true

Enable or disable debug exporter.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_DEBUG_EXPORTER_ENABLED

boolean

false

Fixed debug export receiver port the localhost service will listen to.

Environment variable: QUARKUS_ZEEBE_DEVSERVICES_DEBUG_EXPORTER_RECEIVER_PORT

int

8080

Observe changes in the bpmn files.

Environment variable: QUARKUS_ZEEBE_DEV_MODE_WATCH_BPMN_FILES

boolean

true

Observe changes in the bpmn directory and subdirectories.

Environment variable: QUARKUS_ZEEBE_DEV_MODE_WATCH_BPMN_DIR

boolean

true

Observe changes in the job worker.

Environment variable: QUARKUS_ZEEBE_DEV_MODE_WATCH_JOB_WORKER

boolean

true

Whether an auto scan BPMN process folder. Default true

Environment variable: QUARKUS_ZEEBE_RESOURCES_ENABLED

boolean

true

BPMN process root folder. Default bpmn

Environment variable: QUARKUS_ZEEBE_RESOURCES_LOCATION

string

bpmn

Whether a metrics is enabled in case the micrometer or micro-profile metrics extension is present.

Environment variable: QUARKUS_ZEEBE_METRICS_ENABLED

boolean

true

Whether a health check is published in case the smallrye-health extension is present.

Environment variable: QUARKUS_ZEEBE_HEALTH_ENABLED

boolean

true

Whether an opentracing is published in case the smallrye-opentracing extension is present.

Environment variable: QUARKUS_ZEEBE_TRACING_ENABLED

boolean

true