Reputation: 48356
I want to do periodic job (cron-job) every ten minutes, which loads data from Postgresql
. GenerateSequence
is used to implement it, here are the codes.
PostgresOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PostgresOptions.class);
// create pipeline
Pipeline pipeline = Pipeline.create(options);
PGSimpleDataSource pgDataSource = getPostgresDataSource(options);
// run sequence
PCollection<KV<String, Float>> sequence = pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(10))
).apply(
"Read Postgresql",
ParDo.of(new ReadPG(pgDataSource))
);
...
private static class ReadPG extends DoFn<Long, KV<String, Float>> {
private PGSimpleDataSource pgDataSource;
public ReadPG(PGSimpleDataSource source) {
pgDataSource = source;
}
@ProcessElement
public void processElement( @Element Long seq, final OutputReceiver<KV<String, Float>> receiver) {
JdbcIO.<KV<String, Float>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.pgDataSource))
.withCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Float.class)))
.withRowMapper(new JdbcIO.RowMapper<KV<String, Float>>() {
public KV<String, Float> mapRow(ResultSet resultSet) throws Exception {
KV<String, Float> kv = KV.of(resultSet.getString("id"), resultSet.getFloat("key1"));
while (resultSet.next()) {
System.out.println(resultSet.getString("id"));
}
receiver.output(kv);
return kv;
}
})
.withQuery("select * from table_name limit 10;");
}
}
However, there is no data is loaded from Postgresql
. Is there anything wrong in my codes?
Beam version: 2.15.0
Upvotes: 0
Views: 353
Reputation: 48356
Finally, the periodic job is made it through GenerateSequence
and Window
.
Here are sample codes
pipeline.apply(
"Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardMinutes(WindowInterval))
)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(WindowInterval))))
.apply("Read Data from PG", new ReadPG(pgDataSource))
Upvotes: 0
Reputation: 1443
Any Beam PTransform has to be part of pipeline and you can't use it just inside processElement()
since it won't be translated by runner as part of DAG. If you are looking for composite transform then it could be achieved by overriding expand()
method of other PTransform.
In the same time, Beam is not intended to schedule jobs to run on your backend processing engine, so it should be do performed out of Beam. Some options are mentioned here.
Upvotes: 2