Reputation: 3212
I have google cloud pubsub subscriber application(question is not related to pubsub, i believe there is same behaviour with jms listener). It don't require any webcontainer like tomcat or other(There is plenty questions where problem was missing container dependency, here i don`t need webcontainer). It should start and process messages. The problem: it stops after start.
Application is very simple, here is build.gradle
file:
buildscript {
ext {
springBootVersion = '1.5.8.RELEASE'
}
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter'
}
And here is application launcher:
@SpringBootApplication
public class CommunicationApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(CommunicationApplication.class, args);
//waiting data from system input to prevent stop after start
//System.in.read();
}
}
And here is pubsub manager bean(it creates pubsub listener)
@Component
public class PubsubManager {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private MessageReceiver messageReceiver;
private Subscriber subscriber;
@PostConstruct
protected void start() throws Exception {
createTopic();
createSubscriber();
}
private void createTopic() throws Exception {
TopicName topic = TopicName.of(ServiceOptions.getDefaultProjectId(), "COMMUNICATION_TOPIC_NAME");
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
topicAdminClient.createTopic(topic);
logger.info("Topic {}:{} created", topic.getProject(), topic.getTopic());
} catch (ApiException e) {
if (e.getStatusCode().getCode() == StatusCode.Code.ALREADY_EXISTS) {
logger.info("Topic {}:{} already exist", topic.getProject(), topic.getTopic());
} else {
logger.error("Error create topic: {}", e.getStatusCode().getCode());
}
logger.info("isRetryable: {}", e.isRetryable());
}
}
private void createSubscriber() throws Exception {
SubscriptionName subscriptionName = SubscriptionName.of(ServiceOptions.getDefaultProjectId(),
MessagingConstants.COMMUNICATION_SUBSCRIPTION_NAME);
// create a subscriber bound to the asynchronous message receiver
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.of(ServiceOptions.getDefaultProjectId(),
"COMMUNICATION_TOPIC_NAME");
Subscription subscription = subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
} catch (Exception e) {
logger.info("subscription already created:", e);
}
subscriber = Subscriber.newBuilder(subscriptionName, messageReceiver).build();
subscriber.startAsync().awaitRunning();
logger.info("subscriber start receiving messages");
}
@PreDestroy
protected void close() throws Exception {
logger.info("subscriber stop receiving messages");
subscriber.stopAsync();
}
}
If i uncomment System.in.read();
- it works at local environment, but at cloud it don`t work. Could you suggest proper way to fix this? Thanks.
Upvotes: 1
Views: 1125
Reputation: 1916
If you want to create only the PubSub Listener, the following worked for me:
git clone https://github.com/spring-guides/gs-messaging-gcp-pubsub
cd gs-messaging-gcp-pubsub/complete
cd gs-messaging-gcp-pubsub/complete/src/main/resources
Once in there you can delete the static folder.build.gradle:
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.9.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'gs-spring-cloud-gcp'
version = '0.1.0'
}
repositories {
mavenCentral()
maven {
url "http://repo.spring.io/snapshot"
}
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.cloud:spring-cloud-gcp-starter-pubsub:1.0.0.BUILD-SNAPSHOT")
compile("org.springframework.cloud:spring-integration-gcp:1.0.0.BUILD-SNAPSHOT")
}
pubSubApplication.java
package hello;
import java.io.IOException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubOperations;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.support.GcpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.gcp.pubsub.AckMode;
import org.springframework.integration.gcp.pubsub.inbound.PubSubInboundChannelAdapter;
import org.springframework.integration.gcp.pubsub.outbound.PubSubMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@SpringBootApplication
public class PubSubApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubApplication.class);
public static void main(String[] args) throws IOException {
SpringApplication.run(PubSubApplication.class, args);
}
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubOperations pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "testSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message arrived! Payload: " + message.getPayload());
AckReplyConsumer consumer =
(AckReplyConsumer) message.getHeaders().get(GcpHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}
}
Change "testSubscription" to the Subscription you are using
Now in folder gs-messaging-gcp-pubsub/complete, if you run ./gradlew bootRun the app hello.PubSubApplication should get running on local. Any message you publish to the Topic where you are subscribed should appear where you are running the app.
If you have problems with credentials/authenticaion change the parameters in:
gs-messaging-gcp-pubsub/complete/src/main/resources/application.properties
Upvotes: 1