naga
naga

Reputation: 25

JDBC Fetch from oracle with Beam

The below program is to connect to Oracle 11g and fetch the records. How ever it is giving me NullPointerException for the coder at pipeline.apply().

I have added the ojdbc14.jar to the project dependencies.

public static void main(String[] args) {

        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());      
         p.apply(JdbcIO.<KV<Integer, String>>read()
                   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                          "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@hostdnsname:port/servicename")
                   .withUsername("uname")
                   .withPassword("pwd"))
                   .withQuery("select EMPID,NAME from EMPLOYEE1")
                   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                       return KV.of(resultSet.getInt(1), resultSet.getString(2));
                     }
                   }));
         p.run();

    }

is giving the below error.Any clue?

Exception in thread "main" java.lang.NullPointerException: coder
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:283)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:216)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:399)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:307)
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:158)
    at org.apache.beam.examples.v030.JdbcUtil.main(JdbcUtil.java:21)

Upvotes: 1

Views: 2694

Answers (2)

Dpk Goyal
Dpk Goyal

Reputation: 113

Try this.

 pipeline.apply(( JdbcIO.<KV<Integer, String>>read().withCoder(KvCoder.of(VarIntCoder.of(),StringUtf8Coder.of())) 
               .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                      "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/deepakgoyal")
                    .withUsername("root")
                    .withPassword("root"))
               .withQuery("select empid, name from employee")

               .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                 public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                   return KV.of(resultSet.getInt(1), resultSet.getString(2));
                 }
               })
             ))

And don't forget to add MySQL connector jar in your project.

Upvotes: -1

Kenn Knowles
Kenn Knowles

Reputation: 6023

Hi there!

Sorry the error message is not very helpful, but in fact it is a validation step. I have filed BEAM-959 to improve this.

You are required to provide a coder such as via

.withCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())`

I have filed BEAM-960 to improve automation of this coder, like we have in most other places in Beam.

Upvotes: 3

Related Questions