Reputation: 1
try to use spring cloud gcp binder library polling messages from GCP pubsub topic. ref from streaming-vs-polled-input. use spring.cloud.stream.gcp.pubsub.default.consumer.maxFetchSize poll get N message (maxFetchSize's value) at a time, but even i set up the property and set maxFetchSize to 2 or others. i alwasy got 1 message from topic, even the topic has other messages. Does anyone here has other idea?
# application.properties
spring.cloud.gcp.pubsub.project-id=
spring.cloud.gcp.credentials.location=
spring.cloud.stream.default-binder=pubsub
spring.cloud.stream.bindings.input.destination=iamatopic
spring.cloud.stream.bindings.input.content-type=text/plain;charset=UTF-8
spring.cloud.stream.gcp.pubsub.default.consumer.maxFetchSize=3
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;
public interface PollableSink {
@Input("input")
PollableMessageSource input();
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Scheduled;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
import lombok.extern.slf4j.Slf4j;
@Configuration
@EnableBinding(PollableSink.class)
@Slf4j
public class PollConfiguration {
@Autowired
PollableMessageSource destIn;
PolledMessageHandler messageHandler = new PolledMessageHandler();
@Scheduled(fixedRate = 5000)
public void poller() {
log.info("start polling ");
destIn.poll(this.messageHandler, ParameterizedTypeReference.forType(String.class));
log.info("end polling ");
}
static class PolledMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) {
AcknowledgeablePubsubMessage ackableMessage = (AcknowledgeablePubsubMessage) message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
ackableMessage.ack();
System.out.println("get payload : " + message.getPayload());
}
}
}
Upvotes: 0
Views: 691
Reputation: 466
Try setting spring.cloud.stream.poller.maxMessagesPerPoll
value -- it's 1 by default in Spring Cloud Stream.
Upvotes: 0