Reputation: 5323
I have a Flink job derived from the starter Maven project. That job has a source that opens a Postgres JDBC connection. I am executing the job on my own Flink session cluster using the example docker-compose.yml
.
When I submit the job for the first time it executes successfully. When I try to submit it again I get the following error:
Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://host.docker.internal:5432/postgres?user=postgres&password=mypassword
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:270)
at com.myorg.project.JdbcPollingSource.run(JdbcPollingSource.java:25)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
I have to restart my cluster in order to rerun my job. Why is this happening? How can I submit my job again without having to restart the cluster?
The only addition to the Maven starter project is:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.24</version>
</dependency>
The Flink source does nothing but open a JDBC connection and is as follows:
package com.mycompany;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
public class JdbcSource extends RichSourceFunction<Integer> {
private final String connString;
public JdbcSource(String connString) {
this.connString = connString;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
try (Connection conn = DriverManager.getConnection(this.connString)) {
}
}
@Override
public void cancel() {
}
}
I have tested this on Flink version 1.14.0 and 1.13.2 with the same results.
Note that this question provides a solution of using Class.forName("org.postgresql.Driver");
within my RichSourceFunction
. However I would like to know what is going on.
Upvotes: 6
Views: 1569
Reputation: 356
According to the official documentation of PostgreSQL JDBC driver, if you are using Java 1.6+, you can just put the driver's jar file into the classpath. The driver will be loaded by the JVM automatically. So the question is how to place the driver's jar file into the classpath.
Since you are using docker to deploy a session cluster, there's two way that may works:
Run and access the image with the command:
docker docker run -it -v $PWD:/tmp/flink <address to image> -- bash
Copy the driver's jar file into the folder /opt/flink/lib
.
Create a new image from the container. Since /opt/flink/lib
is loaded as classpath by default, now the driver's jar file is located at the classpath.
Add maven-assembly-plugin to the pom.xml
of your maven project. Recompile your project and get a jar file with dependencies. In this jar, the PostgreSQL JDBC driver is packaged together.
Upvotes: 0
Reputation: 1223
I have this pom.xml dependency for Postgres for Apache Flink 1.13:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4-1201-jdbc41</version>
</dependency>
you can have a Postgres connector class for example:
public class PostgreSQLConnector {
private static volatile PostgreSQLConnector instance;
private Connection connectionDB = null;
public PostgreSQLConnector(your params) {
...
}
public static PostgreSQLConnector getInstance() {
PostgreSQLConnector postgreSQLConnector = instance;
if (postgreSQLConnector != null)
return postgreSQLConnector;
synchronized (PostgreSQLConnector.class) {
if (instance == null) {
instance = new PostgreSQLConnector(your params);
}
return instance;
}
}
public Connection getConnectionDB() throws SQLException {
if (checkNullConnection()) CreateConnection();
return connectionDB;
}
public void CheckConnection() throws SQLException {
if (checkNullConnection()) CreateConnection();
}
public void CreateConnection() throws SQLException {
try {
Class.forName(sink.driverName);
connectionDB = DriverManager.getConnection(fullUrl, username, password);
} catch (Exception e) {
...
}
}
public boolean checkNullConnection() throws SQLException {
return (connectionDB == null || connectionDB.isClosed());
}
}
then you can create a RichSourceFunction
and create the connection in the overrides open
method, not in the run
public class JdbcSource extends RichSourceFunction<Integer> {
private final String connString;
private static Connection dbConnection;
private static final PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getInstance();
public JdbcSource(String connString) {
this.connString = connString;
}
@Override
public void open(Configuration parameters) throws SQLException {
dbConnection = postgreSQLConnector.getConnectionDB();
}
@Override
public void close() throws Exception {
if (dbConnection != null) dbConnection.close();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
do something here with the connection
}
@Override
public void cancel() {
}
}
Something like that you could maybe try and it should work
Upvotes: 0
Reputation: 64
The first question you can refer JDBC driver cannot be found when reading a DataSet from an SQL database in Apache Flink.
Second, if you use session mode. It can be easy to rerun the Flink job without restart the cluster. you can log in job manager shell then use the command rerun job.
Class.forName("org.postgresql.Driver");
will trigger static method block, so you DriverManager can get driver class. see:
// from org.postgresql.Driver
static {
try {
register();
} catch (SQLException var1) {
throw new ExceptionInInitializerError(var1);
}
}
Upvotes: 0