Yunus Einsteinium
Yunus Einsteinium

Reputation: 1180

Google PubSub with Dataflow using Spring Integration

Developing a realtime tracking system using Spring framework and Google Cloud Platform. Spring Cloud GCP enables to easily write a GCP PubSub application Spring Integration way. From their github page i was able to write the following application like: Github Samples

@Configuration
@Slf4j
public class GCPConfiguration {
    /*
    *   Message sender code
    * */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        PubSubMessageHandler adapter =
                new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        adapter.setPublishCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                log.info("Message was sent successfully.");
            }
        });

        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
    public interface PubSubOutboundGateway {
        void sendToPubSub(String text);
    }

    /*
    *   Message receiver code
    * */
    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubOperations pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
        log.info("Message arrived! Payload: " + payload);

        ackReplyConsumer.ack();
    }
}

Tracking devices will continuously be sending data to a TCP port exposed by this application which needs to be transformed then persisted to BigQuery and GC SQL. Getting data from TCP port and publishing it to GC PubSub is already in place. What i don't know is how and where to add Google Cloud Dataflow code that is coming from GC PubSub

Update

The goal is to insert data to GC BigQuery and GC SQL, so answer that will lead to data inserted in those services is fine.

Upvotes: 2

Views: 1120

Answers (1)

MattL
MattL

Reputation: 163

It looks like your question is how to use Google Cloud Dataflow to stream from Pub/Sub to Bigquery.

The Dataflow site links to this example from its examples page. Note that this is using version 1.x of the SDK and not the 2.x Apache Beam version. You can find a similar example here.

There is also a Google-provided template that can be used and requires no coding.

Edit: the template was recently open-sourced and is available on github.

Upvotes: 1

Related Questions