vivek
vivek

Reputation: 145

StreamSource vs BatchStage in hazelcast jet

I'm using CDC in my application.

CDC always returns the object in the form of streams.StreamSource<ChangeRecord> source.

My Requirement:-

1] Capture the db changes using cdc and store in the map..

2] in the next steps I will be doing batch process based on user i/p data.

Here is my code.

public Pipeline returnPiple() {

        StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
                .setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
                .setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
                .setTableWhitelist("tblName").build();

        Pipeline pipeline = Pipeline.create();

        // this object as stream
        pipeline.readFrom(source).withoutTimestamps().filter(deletedFalse)
                .writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));

        // from here I will be doing batch operation based on user i/p
        pipeline.readFrom(Sources.map("mapStore")).writeTo(Sinks.logger());

        return pipeline;
    }

when I tried to read data from mapStore. I'm getting null..

so how do the batch processing from cdc.

Upvotes: 0

Views: 173

Answers (2)

Vlado Schreiner
Vlado Schreiner

Reputation: 458

Split the code to two pipelines and submit them separately.

Pipeline cdcPipeline = Pipeline.create();

cdcPipeline
    .readFrom(source)
    .withoutTimestamps()
    .filter(deletedFalse)
    .writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));

jetService.newJobIfAbsent(cdcPipeline);

// now Jet streams data from your Postgres DB to the IMap in the background

Pipeline batchQuery = Pipeline.create();
batchQuery
    .readFrom(Sources.map("mapStore"))
    .writeTo(Sinks.logger());

// runs your query once and prints the results
jetService.newJobIfAbsent(batchQuery).join(); 

Upvotes: 0

Oliv
Oliv

Reputation: 10812

You have two sub-pipelines in your pipeline. The first one is a streaming one and streams data from the CDC source to mapStore IMap. The other reads the mapStore IMap and writes it to the logger. However, the 2nd job is a batch job - it reads the map once and completes. It doesn't pick up events coming from the CDC afterwards.

Upvotes: 0

Related Questions