killernerd
killernerd

Reputation: 377

pubsub messages not being pulled with poller and serviceactivator

I've been trying to get pubsub to work within a spring application. To get up and running I've been reading through tutorials and documentation like this

I can get things to build and start but if I go through cloud console to send a message to the test subscription it never arrives.

This is what my code looks like right now:

@Configuration
@Import({GcpPubSubAutoConfiguration.class})
public class PubSubConfigurator {

@Bean
public GcpProjectIdProvider projectIdProvider(){
    return () -> "project-id";
}

@Bean
public CredentialsProvider credentialsProvider(){
    return GoogleCredentials::getApplicationDefault;
}

@Bean
public MessageChannel inputMessageChannel() {
   return new PublishSubscribeChannel();
}

@Bean
@InboundChannelAdapter(channel = "inputMessageChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
    PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate,  "tst-sandbox");
    messageSource.setAckMode(AckMode.MANUAL);
    messageSource.setPayloadType(String.class);
    messageSource.setBlockOnPull(false);
    messageSource.setMaxFetchSize(10);
    //pubSubTemplate.pull("tst-sandbox", 10, true);
    return messageSource;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
    System.out.println("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
    message.ack();
}
}

My thinking was that the poller annotation would start a poller to run every so often to check for messages and send them to the method annotated with service activator but this is clearly not the case as it is never hit.

Interestingly enough if I put a breakpoint right before "return messageSource" and check the result of the template.pull call the messages ARE returned so it is seemingly not an issue with the connection itself.

What am I missing here? Tutorials and documentation aren't helping much at this point as they all use pretty much the same bit of tutorial code like above...

I have tried variations of the above code like creating the adapter instead of the messagesource like so:

@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(pubSubTemplate, "tst-sandbox");
    adapter.setOutputChannel(messageChannel);
    adapter.setAckMode(AckMode.MANUAL);
    adapter.setPayloadType(String.class);
    return adapter;
}

to no avail. Any suggestions are welcome.

Upvotes: 0

Views: 996

Answers (1)

killernerd
killernerd

Reputation: 377

Found the problem after creating a spring boot project from scratch (main project is normal spring). Noticed in the debug output that it was auto starting the service activator bean and some other things like actually subscribing to the channels which it wasn't doing in the main project.

After a quick google the solution was simple, had to add

@EnableIntegration

annotation at class level and the messages started coming in.

Upvotes: 0

Related Questions