Google Cloud Services - PubSub

This extension allows using Google Cloud PubSub inside your Quarkus application.

Be sure to have read the Google Cloud Services extension pack global documentation before this one, it contains general configuration and information.

Bootstrapping the project

First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):

mvn io.quarkus:quarkus-maven-plugin:<quarkusVersion>:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=pubsub-quickstart \
    -Dextensions="resteasy-reactive-jackson,quarkus-google-cloud-pubsub"
cd pubsub-quickstart

This command generates a Maven project, importing the Google Cloud PubSub extension.

If you already have your Quarkus project configured, you can add the quarkus-google-cloud-pubsub extension to your project by running the following command in your project base directory:

./mvnw quarkus:add-extension -Dextensions="quarkus-google-cloud-pubsub"

This will add the following to your pom.xml:

<dependency>
    <groupId>io.quarkiverse.googlecloudservices</groupId>
    <artifactId>quarkus-google-cloud-pubsub</artifactId>
</dependency>

Preparatory steps

To test PubSub you first need to create a topic named test-topic

You can create one with gcloud:

gcloud pubsub topics create test-topic

Authentication

This extension provides a QuarkusPubSub CDI bean that can help to interact with Google PubSub. QuarkusPubSub is automatically authenticated, so you don’t have to do anything else to use it.

If you don’t want to use QuarkusPubSub, be sure to configure the authentication. By default, PubSub mandates the usage of the GOOGLE_APPLICATION_CREDENTIALS environment variable to define its credentials, so you need to set it instead of relying on the quarkus.google.cloud.service-account-location property. Another solution, is to inject a CredentialsProvider provided by the extension, and to use it inside the various PubSub builders and settings objects when, instantiating PubSub components.

Some example

This is an example usage of the extension: we create a REST resource with a single endpoint that sends a message to the test-topic topic when hit.

We also register a consumer to the same topic at @PostConstruct time that logs all received messages on the topic so we can check that it works.

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;

@Path("/pubsub")
public class PubSubResource {
    private static final Logger LOG = Logger.getLogger(PubSubResource.class);

    @Inject
    QuarkusPubSub pubSub;

    private Subscriber subscriber;

    @PostConstruct
    void init() throws IOException {
        // init topic and subscription
        pubSub.createTopic("test-topic");
        pubSub.createSubscription("test-topic", "test-subscription");

        // Subscribe to PubSub
        MessageReceiver receiver = (message, consumer) -> {
            LOG.infov("Got message {0}", message.getData().toStringUtf8());
            consumer.ack();
        };
        subscriber = pubSub.subscriber("test-subscription", receiver);
        subscriber.startAsync().awaitRunning();
    }

    @PreDestroy
    void destroy() {
        // Stop the subscription at destroy time
        if (subscriber != null) {
            subscriber.stopAsync();
        }
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public void pubsub() throws IOException, InterruptedException {
        // Init a publisher to the topic
        Publisher publisher = pubSub.publisher("test-topic");

        try {
            ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
            ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);// Publish the message
            ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {// Wait for message submission and log the result
                public void onSuccess(String messageId) {
                    LOG.infov("published with message id {0}", messageId);
                }

                public void onFailure(Throwable t) {
                    LOG.warnv("failed to publish: {0}", t);
                }
            }, MoreExecutors.directExecutor());
        } finally {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
        }
    }
}

Dev Service

Configuring the Dev Service

The extension provides a Dev Service that can be used to run a local PubSub emulator. This is useful for testing purposes, so you don’t have to rely on a real PubSub instance. By default, the Dev Service is disabled, but you can enable it by setting the

  • quarkus.google.cloud.pubsub.devservice.enabled property to true

You can also set the

  • quarkus.google.cloud.pubsub.devservice.emulator-port property to change the port on which the emulator will be started (by default there is no port set, so the emulator will use a random port)

Using the Dev Service

If we want to connect to the Dev Service, we need to specify TransportChannelProvider when creating subscriptions and publishers.

We can just reuse the code from the previous example and add the TransportChannelProvider to the Subscriber and Publisher. So what do we need to change?

As a first thing, we should declare a variable which we can then reuse and also inject the quarkus.google.cloud.pubsub.emulator-host property:

@ConfigProperty(name = "quarkus.google.cloud.pubsub.emulator-host")
String emulatorHost;

private TransportChannelProvider channelProvider;

Then, we can create a TransportChannelProvider that provides connection to devservice within the init method:

// Create a ChannelProvider that connects to the Dev Service
ManagedChannel channel = ManagedChannelBuilder.forTarget(emulatorHost).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

Also in the same method when creating the Subscriber we set the TransportChannelProvider:

// Create a subscriber and set the ChannelProvider
subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).build();
subscriber.startAsync().awaitRunning();

The same is done when creating the Publisher in the pubsub method:

// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setChannelProvider(channelProvider)
.build();

And finally we also set the TransportChannelProvider when creating the SubscriptionAdminClient in the initSubscription method:

SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
// Set the ChannelProvider
.setTransportChannelProvider(channelProvider)
.build();