Skip to content

Commit 43ef2e8

Browse files
Adds README and other smaller fixes
1 parent a06cd45 commit 43ef2e8

File tree

4 files changed

+152
-1
lines changed

4 files changed

+152
-1
lines changed

spring/pubsub/README.md

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Getting started with Spring Integration channel adapters for Google Cloud Pub/Sub
2+
3+
This is a sample application that uses Spring Integration and Spring Boot to read and write messages
4+
to Google Cloud Pub/Sub.
5+
6+
PubsubApplication is a typical Spring Boot application. We declare all the necessary beans for the
7+
application to work in the `PubsubApplication` class. The most important ones are the inbound and
8+
outbound channel adapters.
9+
10+
## Channel adapters
11+
12+
On Spring Integration, channel adapters are adapters that send or receive messages from external
13+
systems, convert them to/from an internal Spring message representation and read/write them to a
14+
Spring channel, which can then have other components attached to it, such as service activators.
15+
16+
### Inbound channel adapter
17+
18+
PubsubInboundChannelAdapter is Spring Cloud GCP Pub/Sub inbound channel adapter class. It's declared
19+
in the user app as follows:
20+
21+
```
22+
@Bean
23+
public PubsubInboundChannelAdapter messageChannelAdapter(
24+
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
25+
SubscriberFactory subscriberFactory) {
26+
PubsubInboundChannelAdapter adapter =
27+
new PubsubInboundChannelAdapter(subscriberFactory, "messages");
28+
adapter.setOutputChannel(inputChannel);
29+
adapter.setAckMode(AckMode.MANUAL);
30+
31+
return adapter;
32+
}
33+
```
34+
35+
In the example, we instantiate the `PubsubInboundChannelAdapter` object with a SubscriberFactory and
36+
a Google Cloud Pub/Sub subscription name, from where the adapter listens to messages, and then set
37+
its output channel and ack mode.
38+
39+
In apps which use the Spring Cloud GCP Pubsub Boot starter, a SubscriberFactory is automatically
40+
provided. The subscription name (e.g., `"messages"`) is the name of a Google Cloud Pub/Sub
41+
subscription that must already exist when the channel adapter is created.
42+
43+
The input channel is a channel in which messages get into Spring from an external system.
44+
In this example, we use a PublishSubscribeChannel, which broadcasts incoming messages to all its
45+
subscribers, including service activators.
46+
47+
```
48+
@Bean
49+
public MessageChannel pubsubInputChannel() {
50+
return new PublishSubscribeChannel();
51+
}
52+
```
53+
54+
Setting the acknowledgement mode on the inbound channel adapter is optional. It is set to automatic
55+
by default. If set to manual, messages must be explicitly acknowledged through the
56+
`AckReplyConsumer` object from the Spring message header `GcpHeader.ACKNOWLEDGEMENT`.
57+
58+
```
59+
AckReplyConsumer consumer =
60+
(AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
61+
consumer.ack();
62+
```
63+
64+
A service activator is typically attached to a channel in order to process incoming messages. Here
65+
is an example of a service activator that logs and acknowledges the received message.
66+
67+
```
68+
@Bean
69+
@ServiceActivator(inputChannel = "pubsubInputChannel")
70+
public MessageHandler messageReceiver1() {
71+
return message -> {
72+
LOGGER.info("Message arrived! Payload: "
73+
+ ((ByteString) message.getPayload()).toStringUtf8());
74+
AckReplyConsumer consumer =
75+
(AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
76+
consumer.ack();
77+
};
78+
}
79+
```
80+
81+
### Outbound channel adapter
82+
83+
PubSubMessageHandler is Spring Cloud GCP's Pub/Sub outbound channel adapter. It converts Spring
84+
messages in a channel to an external representation and sends them to a Google Cloud Pub/Sub topic.
85+
86+
```
87+
@Bean
88+
@ServiceActivator(inputChannel = "pubsubOutputChannel")
89+
public MessageHandler messageSender(PubsubTemplate pubsubTemplate) {
90+
PubsubMessageHandler outboundAdapter = new PubsubMessageHandler(pubsubTemplate);
91+
outboundAdapter.setTopic("test");
92+
return outboundAdapter;
93+
}
94+
```
95+
96+
`PubsubTemplate` is Spring Cloud GCP's abstraction to send messages to Google Cloud Pub/Sub. It
97+
contains the logic to create a Google Cloud Pub/Sub `Publisher`, convert Spring messages to Google
98+
Cloud Pub/Sub `PubsubMessage` and publish them to a topic.
99+
100+
`PubsubMessageHandler` requires a `PubsubTemplate` to be instantiated. The Spring Cloud GCP Boot
101+
Pubsub starter provides a pre-configured `PubsubTemplate`, ready to use. `PubsubMessageHandler`
102+
also requires the name of a Google Cloud Pub/Sub topic, which must exist before any messages are
103+
sent.
104+
105+
We use a messaging gateway to write to a Spring channel.
106+
107+
```
108+
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
109+
public interface PubsubOutboundGateway {
110+
111+
void sendToPubsub(String text);
112+
}
113+
```
114+
115+
Spring auto-generates the output channel, as well as the gateway code and injects it to the local
116+
variable in `WebAppController`.
117+
118+
```
119+
@Autowired
120+
private PubsubOutboundGateway messagingGateway;
121+
```
122+
123+
## Administration
124+
125+
The Spring Cloud GCP Pubsub package provides a Google Cloud Pub/Sub administration utility,
126+
`PubsubAdmin`, to simplify the creation, listing and deletion of Google Cloud Pub/Sub topics and
127+
subscriptions. The Spring Cloud GCP Pubsub starter provides a pre-configured `PubsubAdmin`, based on
128+
an application's properties.
129+
130+
```
131+
@Autowired
132+
private PubsubAdmin admin;
133+
```
134+
135+
## Sample application
136+
137+
This sample application uses Spring Boot and Spring Web to declare a REST controller. The front-end
138+
uses client-side scripting with Angular.
139+
140+
It is exemplified how to:
141+
* Send messages to a Google Cloud Pub/Sub topic through an outbound channel adapter;
142+
* Receive and process messages from a Google Cloud Pub/Sub subscription through an inbound channel
143+
adapter;
144+
* Create new Google Cloud Pub/Sub topics and subscriptions through the Pub/Sub admin utility.

spring/pubsub/src/main/java/com/example/spring/pubsub/PubsubApplication.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public class PubsubApplication {
4545
public static void main(String[] args) throws IOException {
4646
SpringApplication.run(PubsubApplication.class, args);
4747
}
48+
49+
// Inbound channel adapter.
50+
4851
/**
4952
* Spring channel for incoming messages from Google Cloud Pub/Sub.
5053
*
@@ -115,6 +118,8 @@ public MessageHandler messageReceiver2() {
115118
};
116119
}
117120

121+
// Outbound channel adapter
122+
118123
/**
119124
* The outbound channel adapter to write messages from a Spring channel to a Google Cloud Pub/Sub
120125
* topic.

spring/pubsub/src/main/java/com/example/spring/pubsub/WebAppController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public class WebAppController {
3737
@Autowired
3838
private PubsubOutboundGateway messagingGateway;
3939

40-
@Autowired private PubsubAdmin admin;
40+
@Autowired
41+
private PubsubAdmin admin;
4142

4243
/**
4344
* Lists every topic in the project.

spring/pubsub/src/main/resources/application.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
#spring.cloud.gcp.credentialsLocation=file:[LOCAL_PATH_TO_CREDENTIALS]
33
#
44
#spring.cloud.gcp.pubsub.subscriber.executorThreads=[SUBSCRIBER_THREADS]
5+
#spring.cloud.gcp.pubsub.subscriber.ackDeadline=[ACK_DEADLINE_SECONDS]
56
#spring.cloud.gcp.pubsub.publisher.executorThreads=[PUBLISHER_THREADS]

0 commit comments

Comments
 (0)