Reputation: 476
When i run my pipeline from local machine, i can update the table which resides in the cloud Sql instance. But, when i moved this to run using DataflowRunner, the same is failing with the below exception.
To connect from my eclipse, I created the data source config as
.create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb")
.
The same i changed to
.create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")
while running through the Dataflow runner.
The exception i get when i run this is given below :
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f...
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully.
SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
... 14 more
Any help to fix this is really appreciated. This is my first attempt to run a beam pipeline as a dataflow job.
PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
((DataflowPipelineOptions) options).setNumWorkers(2);
((DataflowPipelineOptions)options).setProject("xxxxx");
((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging");
((DataflowPipelineOptions)options).setRunner(DataflowRunner.class);
((DataflowPipelineOptions)options).setStreaming(false);
options.setTempLocation("gs://xxxx/tempbucket");
options.setJobName("sqlpipeline");
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db")
.withUsername("root").withPassword("root"))
.withQuery(
"select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account")
.withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setFetchSize(1);
preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
}
}).withRowMapper(new JdbcIO.RowMapper<Account>() {
public Account mapRow(ResultSet resultSet) throws Exception {
Account account = new Account();
account.setAccount_id(resultSet.getInt("account_id"));
account.setAccount_parent(resultSet.getInt("account_parent"));
account.setAccount_description(resultSet.getString("account_description"));
account.setAccount_type(resultSet.getString("account_type"));
account.setAccount_rollup("account_rollup");
account.setCustom_Members("Custom_Members");
return account;
}
}));
Upvotes: 0
Views: 1715
Reputation: 909
Hi I think it's better to move on with "com.mysql.jdbc.Driver" because google driver is supporting for app engine deployments
So as it goes this is what my pipeline configurations look alike and it works perfectly fine for me
PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
)
.withQuery(
"SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
.withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
.withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () {
@Override
public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception {
LOG.info(resultSet.getDouble(1)+ "Came");
return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
}
}));
Hope it will help
Upvotes: 0
Reputation: 1725
Have you properly pulled in the com.google.cloud.sql/mysql-socket-factory maven dependency? Looks like you are failing to load the class.
https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database
Upvotes: 1