diff --git a/pom.xml b/pom.xml index 73b0c5ba6dc..6960ff68d60 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ <module>pubsub/cloud-client</module> <module>spanner/cloud-client</module> <module>speech/cloud-client</module> + <module>spring/pubsub</module> <module>storage/cloud-client</module> <module>storage/json-api</module> <module>storage/storage-transfer</module> diff --git a/spring/pubsub/README.md b/spring/pubsub/README.md new file mode 100644 index 00000000000..ea3b076e5e9 --- /dev/null +++ b/spring/pubsub/README.md @@ -0,0 +1,144 @@ +# Getting started with Spring Integration channel adapters for Google Cloud Pub/Sub + +This is a sample application that uses Spring Integration and Spring Boot to read and write messages +to Google Cloud Pub/Sub. + +PubsubApplication is a typical Spring Boot application. We declare all the necessary beans for the +application to work in the `PubsubApplication` class. The most important ones are the inbound and +outbound channel adapters. + +## Channel adapters + +On Spring Integration, channel adapters are adapters that send or receive messages from external +systems, convert them to/from an internal Spring message representation and read/write them to a +Spring channel, which can then have other components attached to it, such as service activators. + +### Inbound channel adapter + +PubsubInboundChannelAdapter is Spring Cloud GCP Pub/Sub inbound channel adapter class. It's declared +in the user app as follows: + +``` +@Bean +public PubsubInboundChannelAdapter messageChannelAdapter( + @Qualifier("pubsubInputChannel") MessageChannel inputChannel, + SubscriberFactory subscriberFactory) { + PubsubInboundChannelAdapter adapter = + new PubsubInboundChannelAdapter(subscriberFactory, "messages"); + adapter.setOutputChannel(inputChannel); + adapter.setAckMode(AckMode.MANUAL); + + return adapter; +} +``` + +In the example, we instantiate the `PubsubInboundChannelAdapter` object with a SubscriberFactory and +a Google Cloud Pub/Sub subscription name, from where the adapter listens to messages, and then set +its output channel and ack mode. + +In apps which use the Spring Cloud GCP Pubsub Boot starter, a SubscriberFactory is automatically +provided. The subscription name (e.g., `"messages"`) is the name of a Google Cloud Pub/Sub +subscription that must already exist when the channel adapter is created. + +The input channel is a channel in which messages get into Spring from an external system. +In this example, we use a PublishSubscribeChannel, which broadcasts incoming messages to all its +subscribers, including service activators. + +``` +@Bean +public MessageChannel pubsubInputChannel() { + return new PublishSubscribeChannel(); +} +``` + +Setting the acknowledgement mode on the inbound channel adapter is optional. It is set to automatic +by default. If set to manual, messages must be explicitly acknowledged through the +`AckReplyConsumer` object from the Spring message header `GcpHeader.ACKNOWLEDGEMENT`. + +``` +AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); +consumer.ack(); +``` + +A service activator is typically attached to a channel in order to process incoming messages. Here +is an example of a service activator that logs and acknowledges the received message. + +``` +@Bean +@ServiceActivator(inputChannel = "pubsubInputChannel") +public MessageHandler messageReceiver1() { + return message -> { + LOGGER.info("Message arrived! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; +} +``` + +### Outbound channel adapter + +PubSubMessageHandler is Spring Cloud GCP's Pub/Sub outbound channel adapter. It converts Spring +messages in a channel to an external representation and sends them to a Google Cloud Pub/Sub topic. + +``` +@Bean +@ServiceActivator(inputChannel = "pubsubOutputChannel") +public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { + PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate); + outboundAdapter.setTopic("test"); + return outboundAdapter; +} +``` + +`PubsubTemplate` is Spring Cloud GCP's abstraction to send messages to Google Cloud Pub/Sub. It +contains the logic to create a Google Cloud Pub/Sub `Publisher`, convert Spring messages to Google +Cloud Pub/Sub `PubsubMessage` and publish them to a topic. + +`PubsubMessageHandler` requires a `PubsubTemplate` to be instantiated. The Spring Cloud GCP Boot +Pubsub starter provides a pre-configured `PubsubTemplate`, ready to use. `PubsubMessageHandler` +also requires the name of a Google Cloud Pub/Sub topic, which must exist before any messages are +sent. + +We use a messaging gateway to write to a Spring channel. + +``` +@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") +public interface PubsubOutboundGateway { + + void sendToPubsub(String text); +} +``` + +Spring auto-generates the output channel, as well as the gateway code and injects it to the local +variable in `WebAppController`. + +``` +@Autowired +private PubsubOutboundGateway messagingGateway; +``` + +## Administration + +The Spring Cloud GCP Pubsub package provides a Google Cloud Pub/Sub administration utility, +`PubsubAdmin`, to simplify the creation, listing and deletion of Google Cloud Pub/Sub topics and +subscriptions. The Spring Cloud GCP Pubsub starter provides a pre-configured `PubsubAdmin`, based on +an application's properties. + +``` +@Autowired +private PubsubAdmin admin; +``` + +## Sample application + +This sample application uses Spring Boot and Spring Web to declare a REST controller. The front-end +uses client-side scripting with Angular. + +It is exemplified how to: +* Send messages to a Google Cloud Pub/Sub topic through an outbound channel adapter; +* Receive and process messages from a Google Cloud Pub/Sub subscription through an inbound channel +adapter; +* Create new Google Cloud Pub/Sub topics and subscriptions through the Pub/Sub admin utility. diff --git a/spring/pubsub/pom.xml b/spring/pubsub/pom.xml new file mode 100644 index 00000000000..075b9142b19 --- /dev/null +++ b/spring/pubsub/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>doc-samples</artifactId> + <groupId>com.google.cloud</groupId> + <version>1.0.0</version> + <relativePath>../..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + + <artifactId>spring-integration-pubsub-samples</artifactId> + + <properties> + <spring-cloud-gcp.version>1.0.0.BUILD-SNAPSHOT</spring-cloud-gcp.version> + <spring-boot.version>1.5.2.RELEASE</spring-boot.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.springframework.cloud</groupId> + <artifactId>spring-cloud-gcp-starter-pubsub</artifactId> + <version>${spring-cloud-gcp.version}</version> + </dependency> + <dependency> + <groupId>org.springframework.cloud</groupId> + <artifactId>spring-integration-gcp</artifactId> + <version>${spring-cloud-gcp.version}</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>${spring-boot.version}</version> + </dependency> + </dependencies> + + <!-- Angular --> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + <resource> + <directory>${project.build.directory}/generated-resources</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>1.5.2.RELEASE</version> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <!-- Serves *only* to filter the wro.xml so it can get an absolute + path for the project --> + <id>copy-resources</id> + <phase>validate</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/wro</outputDirectory> + <resources> + <resource> + <directory>src/main/wro</directory> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>ro.isdc.wro4j</groupId> + <artifactId>wro4j-maven-plugin</artifactId> + <version>1.7.6</version> + <executions> + <execution> + <phase>generate-resources</phase> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + <configuration> + <wroManagerFactory>ro.isdc.wro.maven.plugin.manager.factory.ConfigurableWroManagerFactory</wroManagerFactory> + <cssDestinationFolder>${project.build.directory}/generated-resources/static/css</cssDestinationFolder> + <jsDestinationFolder>${project.build.directory}/generated-resources/static/js</jsDestinationFolder> + <wroFile>${project.build.directory}/wro/wro.xml</wroFile> + <extraConfigFile>src/main/wro/wro.properties</extraConfigFile> + <contextFolder>${basedir}/src/main/wro</contextFolder> + </configuration> + <dependencies> + <dependency> + <groupId>org.webjars</groupId> + <artifactId>jquery</artifactId> + <version>2.1.1</version> + </dependency> + <dependency> + <groupId>org.webjars</groupId> + <artifactId>angularjs</artifactId> + <version>1.3.8</version> + </dependency> + <dependency> + <groupId>org.webjars</groupId> + <artifactId>bootstrap</artifactId> + <version>3.2.0</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> +</project> diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java new file mode 100644 index 00000000000..047bd9490f8 --- /dev/null +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java @@ -0,0 +1,145 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spring.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate; +import org.springframework.cloud.gcp.pubsub.support.GcpHeaders; +import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.gcp.AckMode; +import org.springframework.integration.gcp.inbound.PubsubInboundChannelAdapter; +import org.springframework.integration.gcp.outbound.PubsubMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@SpringBootApplication +public class PubsubApplication { + + private static final Log LOGGER = LogFactory.getLog(PubsubApplication.class); + + public static void main(String[] args) throws IOException { + SpringApplication.run(PubsubApplication.class, args); + } + + // Inbound channel adapter. + + /** + * Spring channel for incoming messages from Google Cloud Pub/Sub. + * + * <p>We use a {@link PublishSubscribeChannel} which broadcasts messages to every subscriber. In + * this case, every service activator. + */ + @Bean + public MessageChannel pubsubInputChannel() { + return new PublishSubscribeChannel(); + } + + /** + * Inbound channel adapter that gets activated whenever a new message arrives at a Google Cloud + * Pub/Sub subscription. + * + * <p>Messages get posted to the specified input channel, which activates the service activators + * below. + * + * @param inputChannel Spring channel that receives messages and triggers attached service + * activators + * @param subscriberFactory creates the subscriber that listens to messages from Google Cloud + * Pub/Sub + * @return the inbound channel adapter for a Google Cloud Pub/Sub subscription + */ + @Bean + public PubsubInboundChannelAdapter messageChannelAdapter( + @Qualifier("pubsubInputChannel") MessageChannel inputChannel, + SubscriberFactory subscriberFactory) { + PubsubInboundChannelAdapter adapter = + new PubsubInboundChannelAdapter(subscriberFactory, "messages"); + adapter.setOutputChannel(inputChannel); + adapter.setAckMode(AckMode.MANUAL); + + return adapter; + } + + /** + * Message handler that gets triggered whenever a new message arrives at the attached Spring + * channel. + * + * <p>Just logs the received message. Message acknowledgement mode set to manual above, so the + * consumer that allows us to (n)ack is extracted from the message headers and used to ack. + */ + @Bean + @ServiceActivator(inputChannel = "pubsubInputChannel") + public MessageHandler messageReceiver1() { + return message -> { + LOGGER.info("Message arrived! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; + } + + /** + * Second message handler that also gets messages from the same subscription as above. + */ + @Bean + @ServiceActivator(inputChannel = "pubsubInputChannel") + public MessageHandler messageReceiver2() { + return message -> { + LOGGER.info("Message also arrived here! Payload: " + + ((ByteString) message.getPayload()).toStringUtf8()); + AckReplyConsumer consumer = + (AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT); + consumer.ack(); + }; + } + + // Outbound channel adapter + + /** + * The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub + * topic. + * + * @param pubsubTemplate Spring abstraction to send messages to Google Cloud Pub/Sub topics + */ + @Bean + @ServiceActivator(inputChannel = "pubsubOutputChannel") + public MessageHandler messageSender(PubsubTemplate pubsubTemplate) { + PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate); + outboundAdapter.setTopic("test"); + return outboundAdapter; + } + + /** + * A Spring mechanism to write messages to a channel. + */ + @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") + public interface PubsubOutboundGateway { + + void sendToPubsub(String text); + } +} diff --git a/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java new file mode 100644 index 00000000000..bcd4f534423 --- /dev/null +++ b/spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spring.pubsub; + +import com.example.spring.pubsub.PubsubApplication.PubsubOutboundGateway; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.gcp.pubsub.PubsubAdmin; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.view.RedirectView; + +@RestController +public class WebAppController { + + @Autowired + private PubsubOutboundGateway messagingGateway; + + @Autowired + private PubsubAdmin admin; + + /** + * Lists every topic in the project. + * + * @return a list of the names of every topic in the project + */ + @GetMapping("/listTopics") + public List<String> listTopics() { + return admin + .listTopics() + .stream() + .map(Topic::getNameAsTopicName) + .map(TopicName::getTopic) + .collect(Collectors.toList()); + } + + /** + * Lists every subscription in the project. + * + * @return a list of the names of every subscription in the project + */ + @GetMapping("/listSubscriptions") + public List<String> listSubscriptions() { + return admin + .listSubscriptions() + .stream() + .map(Subscription::getNameAsSubscriptionName) + .map(SubscriptionName::getSubscription) + .collect(Collectors.toList()); + } + + /** + * Posts a message to a Google Cloud Pub/Sub topic, through Spring's messaging gateway, and + * redirects the user to the home page. + * + * @param message the message posted to the Pub/Sub topic + */ + @PostMapping("/postMessage") + public RedirectView addMessage(@RequestParam("message") String message) { + messagingGateway.sendToPubsub(message); + return new RedirectView("/"); + } + + /** + * Creates a new topic on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and + * redirects the user to the home page. + * + * @param topicName the name of the new topic + */ + @PostMapping("/newTopic") + public RedirectView newTopic(@RequestParam("name") String topicName) { + admin.createTopic(topicName); + return new RedirectView("/"); + } + + /** + * Creates a new subscription on Google Cloud Pub/Sub, through Spring's Pub/Sub admin class, and + * redirects the user to the home page. + * + * @param topicName the name of the new subscription + */ + @PostMapping("/newSubscription") + public RedirectView newSubscription( + @RequestParam("name") String subscriptionName, @RequestParam("topic") String topicName) { + admin.createSubscription(subscriptionName, topicName); + return new RedirectView("/"); + } +} diff --git a/spring/pubsub/src/main/resources/application.properties b/spring/pubsub/src/main/resources/application.properties new file mode 100644 index 00000000000..1080c9b04bc --- /dev/null +++ b/spring/pubsub/src/main/resources/application.properties @@ -0,0 +1,6 @@ +#spring.cloud.gcp.projectId=[YOUR_PROJECT_ID] +#spring.cloud.gcp.credentialsLocation=file:[LOCAL_PATH_TO_CREDENTIALS] +# +#spring.cloud.gcp.pubsub.subscriber.executorThreads=[SUBSCRIBER_THREADS] +#spring.cloud.gcp.pubsub.subscriber.ackDeadline=[ACK_DEADLINE_SECONDS] +#spring.cloud.gcp.pubsub.publisher.executorThreads=[PUBLISHER_THREADS] diff --git a/spring/pubsub/src/main/resources/static/index.html b/spring/pubsub/src/main/resources/static/index.html new file mode 100644 index 00000000000..d5a1ad8675a --- /dev/null +++ b/spring/pubsub/src/main/resources/static/index.html @@ -0,0 +1,51 @@ +<!DOCTYPE html> +<!-- + ~ Copyright 2017 original author or authors. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <title>Spring Integration GCP sample</title> + <link href="css/angular-bootstrap.css" rel="stylesheet"> + <script src="js/angular-bootstrap.js" type="text/javascript"></script> + <script src="js/pubsub.js"></script> +</head> +<body ng-app="pubsub"> +<div name="formDiv"> + <form action="/postMessage" method="post"> + Post message: <input type="text" name="message" /> <input type="submit" /> + </form> + <form action="/newTopic" method="post"> + New topic: <input type="text" name="name" /> <input type="submit" /> + </form> + <form action="/newSubscription" method="post"> + New subscription: <input type="text" name="name" /> + for topic <input type="text" name="topic" /> <input type="submit" /> + </form> +</div> +<div ng-controller="listTopics"> + <h1>Topics</h1> + <div ng-repeat="topic in topics"> + {{ topic }}<br/> + </div> +</div> +<div ng-controller="listSubscriptions"> + <h1>Subscriptions</h1> + <div ng-repeat="subscription in subscriptions"> + {{ subscription }}<br/> + </div> +</div> +</body> +</html> diff --git a/spring/pubsub/src/main/resources/static/js/pubsub.js b/spring/pubsub/src/main/resources/static/js/pubsub.js new file mode 100644 index 00000000000..288f07cb991 --- /dev/null +++ b/spring/pubsub/src/main/resources/static/js/pubsub.js @@ -0,0 +1,27 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +angular.module('pubsub', []) + .controller('listTopics', function($scope, $http) { + $http.get('/listTopics').success(function(data) { + $scope.topics = data; + }); + }) + .controller('listSubscriptions', function($scope, $http) { + $http.get('/listSubscriptions').success(function(data) { + $scope.subscriptions = data; + }) + }); diff --git a/spring/pubsub/src/main/wro/main.less b/spring/pubsub/src/main/wro/main.less new file mode 100644 index 00000000000..e69de29bb2d diff --git a/spring/pubsub/src/main/wro/wro.properties b/spring/pubsub/src/main/wro/wro.properties new file mode 100644 index 00000000000..1502f7dc98b --- /dev/null +++ b/spring/pubsub/src/main/wro/wro.properties @@ -0,0 +1,4 @@ +#List of preProcessors +preProcessors=lessCssImport +#List of postProcessors +postProcessors=less4j,jsMin diff --git a/spring/pubsub/src/main/wro/wro.xml b/spring/pubsub/src/main/wro/wro.xml new file mode 100644 index 00000000000..40873c376fc --- /dev/null +++ b/spring/pubsub/src/main/wro/wro.xml @@ -0,0 +1,9 @@ +<groups xmlns="http://www.isdc.ro/wro"> + <group name="angular-bootstrap"> + <css>webjar:bootstrap/3.2.0/less/bootstrap.less</css> + <css>file:${project.basedir}/src/main/wro/main.less</css> + <js>webjar:jquery/2.1.1/jquery.min.js</js> + <js>webjar:bootstrap/3.2.0/bootstrap.js</js> + <js>webjar:angularjs/1.3.8/angular.min.js</js> + </group> +</groups>