zangw
zangw

Reputation: 48356

How to do periodic job in Beam?

I want to do periodic job (cron-job) every ten minutes, which loads data from Postgresql. GenerateSequence is used to implement it, here are the codes.

    PostgresOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PostgresOptions.class);

    // create pipeline
    Pipeline pipeline = Pipeline.create(options);
    PGSimpleDataSource pgDataSource = getPostgresDataSource(options);

    // run sequence
    PCollection<KV<String, Float>> sequence = pipeline.apply(
            "Generate Sequence",
            GenerateSequence.from(0).withRate(1, Duration.standardMinutes(10))
    ).apply(
        "Read Postgresql",
            ParDo.of(new ReadPG(pgDataSource))
    );

...

    private static class ReadPG extends DoFn<Long, KV<String, Float>> {
        private PGSimpleDataSource pgDataSource;
        public ReadPG(PGSimpleDataSource source) {
            pgDataSource = source;
        }

        @ProcessElement
        public void processElement( @Element Long seq, final OutputReceiver<KV<String, Float>> receiver) {
            JdbcIO.<KV<String, Float>>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(this.pgDataSource))
                .withCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Float.class)))
                .withRowMapper(new JdbcIO.RowMapper<KV<String, Float>>() {
                    public KV<String, Float> mapRow(ResultSet resultSet) throws Exception {
                        KV<String, Float> kv = KV.of(resultSet.getString("id"), resultSet.getFloat("key1"));
                        while (resultSet.next()) {
                            System.out.println(resultSet.getString("id"));
                        }
                        receiver.output(kv);

                        return kv;
                    }
                })
                .withQuery("select * from table_name limit 10;");
        }
    }

However, there is no data is loaded from Postgresql. Is there anything wrong in my codes?

Beam version: 2.15.0

Upvotes: 0

Views: 353

Answers (2)

zangw
zangw

Reputation: 48356

Finally, the periodic job is made it through GenerateSequence and Window.

Here are sample codes

        pipeline.apply(
                "Generate Sequence",
                GenerateSequence.from(0).withRate(1, Duration.standardMinutes(WindowInterval))
        )
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(WindowInterval))))
        .apply("Read Data from PG", new ReadPG(pgDataSource))

Upvotes: 0

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

Any Beam PTransform has to be part of pipeline and you can't use it just inside processElement() since it won't be translated by runner as part of DAG. If you are looking for composite transform then it could be achieved by overriding expand() method of other PTransform.

In the same time, Beam is not intended to schedule jobs to run on your backend processing engine, so it should be do performed out of Beam. Some options are mentioned here.

Upvotes: 2

Related Questions