tHe0rAL
tHe0rAL

Reputation: 1

spring cloud stream - gcp pubsub binder maxFetchSize?

try to use spring cloud gcp binder library polling messages from GCP pubsub topic. ref from streaming-vs-polled-input. use spring.cloud.stream.gcp.pubsub.default.consumer.maxFetchSize poll get N message (maxFetchSize's value) at a time, but even i set up the property and set maxFetchSize to 2 or others. i alwasy got 1 message from topic, even the topic has other messages. Does anyone here has other idea?

# application.properties
spring.cloud.gcp.pubsub.project-id= 
spring.cloud.gcp.credentials.location= 
spring.cloud.stream.default-binder=pubsub
spring.cloud.stream.bindings.input.destination=iamatopic
spring.cloud.stream.bindings.input.content-type=text/plain;charset=UTF-8
spring.cloud.stream.gcp.pubsub.default.consumer.maxFetchSize=3

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;

public interface PollableSink {
    @Input("input")
    PollableMessageSource input();
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.Scheduled;

import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;

import lombok.extern.slf4j.Slf4j;

@Configuration
@EnableBinding(PollableSink.class)
@Slf4j
public class PollConfiguration {

    @Autowired
    PollableMessageSource destIn;

    PolledMessageHandler messageHandler = new PolledMessageHandler();

    @Scheduled(fixedRate = 5000)
    public void poller() {
        log.info("start polling ");
        destIn.poll(this.messageHandler, ParameterizedTypeReference.forType(String.class));
        log.info("end polling ");
    }

    static class PolledMessageHandler implements MessageHandler {
        @Override
        public void handleMessage(Message<?> message) {
            AcknowledgeablePubsubMessage ackableMessage = (AcknowledgeablePubsubMessage) message.getHeaders()
                    .get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
            ackableMessage.ack();

            System.out.println("get payload : " + message.getPayload());
        }
    }
}

Upvotes: 0

Views: 691

Answers (1)

Elena Felder
Elena Felder

Reputation: 466

Try setting spring.cloud.stream.poller.maxMessagesPerPoll value -- it's 1 by default in Spring Cloud Stream.

Reference: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_polling_configuration_properties

Upvotes: 0

Related Questions