Reputation: 145
I'm using CDC in my application.
CDC always returns the object in the form of streams.StreamSource<ChangeRecord> source
.
My Requirement:-
1] Capture the db changes using cdc and store in the map..
2] in the next steps I will be doing batch process based on user i/p data.
Here is my code.
public Pipeline returnPiple() {
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
Pipeline pipeline = Pipeline.create();
// this object as stream
pipeline.readFrom(source).withoutTimestamps().filter(deletedFalse)
.writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));
// from here I will be doing batch operation based on user i/p
pipeline.readFrom(Sources.map("mapStore")).writeTo(Sinks.logger());
return pipeline;
}
when I tried to read data from mapStore
. I'm getting null..
so how do the batch processing from cdc.
Upvotes: 0
Views: 173
Reputation: 458
Split the code to two pipelines and submit them separately.
Pipeline cdcPipeline = Pipeline.create();
cdcPipeline
.readFrom(source)
.withoutTimestamps()
.filter(deletedFalse)
.writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));
jetService.newJobIfAbsent(cdcPipeline);
// now Jet streams data from your Postgres DB to the IMap in the background
Pipeline batchQuery = Pipeline.create();
batchQuery
.readFrom(Sources.map("mapStore"))
.writeTo(Sinks.logger());
// runs your query once and prints the results
jetService.newJobIfAbsent(batchQuery).join();
Upvotes: 0
Reputation: 10812
You have two sub-pipelines in your pipeline. The first one is a streaming one and streams data from the CDC source to mapStore
IMap. The other reads the mapStore
IMap and writes it to the logger. However, the 2nd job is a batch job - it reads the map once and completes. It doesn't pick up events coming from the CDC afterwards.
Upvotes: 0