Balu
Balu

Reputation: 476

How do I run a beam class in dataflow which access google sql instance?

When i run my pipeline from local machine, i can update the table which resides in the cloud Sql instance. But, when i moved this to run using DataflowRunner, the same is failing with the below exception.

To connect from my eclipse, I created the data source config as .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb") .

The same i changed to .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db") while running through the Dataflow runner.

  1. Should i prefix the zone information of the instance to ?

The exception i get when i run this is given below :

Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f...
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully.

SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source)
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
    at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
    at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
    ... 14 more

Any help to fix this is really appreciated. This is my first attempt to run a beam pipeline as a dataflow job.

PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

    ((DataflowPipelineOptions) options).setNumWorkers(2);
    ((DataflowPipelineOptions)options).setProject("xxxxx");
    ((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging");
    ((DataflowPipelineOptions)options).setRunner(DataflowRunner.class);
    ((DataflowPipelineOptions)options).setStreaming(false);
    options.setTempLocation("gs://xxxx/tempbucket");
    options.setJobName("sqlpipeline");
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                    .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db")
                    .withUsername("root").withPassword("root"))
            .withQuery(
                    "select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account")
            .withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setFetchSize(1);
                    preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD);

                }
            }).withRowMapper(new JdbcIO.RowMapper<Account>() {
                public Account mapRow(ResultSet resultSet) throws Exception {
                    Account account = new Account();
                    account.setAccount_id(resultSet.getInt("account_id"));
                    account.setAccount_parent(resultSet.getInt("account_parent"));
                    account.setAccount_description(resultSet.getString("account_description"));
                    account.setAccount_type(resultSet.getString("account_type"));
                    account.setAccount_rollup("account_rollup");
                    account.setCustom_Members("Custom_Members");
                    return account;
                }
            }));

Upvotes: 0

Views: 1715

Answers (2)

Ayeshmantha Perera
Ayeshmantha Perera

Reputation: 909

Hi I think it's better to move on with "com.mysql.jdbc.Driver" because google driver is supporting for app engine deployments

So as it goes this is what my pipeline configurations look alike and it works perfectly fine for me

PCollection < KV <  Double, Double >> exchangeRates = p.apply(JdbcIO. < KV <  Double, Double >> read()
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
             )
     .withQuery(
      "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
     .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
     .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () {
      @Override
       public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception {
         LOG.info(resultSet.getDouble(1)+ "Came");
          return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
      }
}));

Hope it will help

Upvotes: 0

Alex Amato
Alex Amato

Reputation: 1725

Have you properly pulled in the com.google.cloud.sql/mysql-socket-factory maven dependency? Looks like you are failing to load the class.

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

Upvotes: 1

Related Questions