Dan
Dan

Reputation: 5323

How can I rerun an Apache Flink Postgres JDBC job without getting "No suitable driver found" exception

I have a Flink job derived from the starter Maven project. That job has a source that opens a Postgres JDBC connection. I am executing the job on my own Flink session cluster using the example docker-compose.yml.

When I submit the job for the first time it executes successfully. When I try to submit it again I get the following error:

Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://host.docker.internal:5432/postgres?user=postgres&password=mypassword
    at java.sql.DriverManager.getConnection(DriverManager.java:689)
    at java.sql.DriverManager.getConnection(DriverManager.java:270)
    at com.myorg.project.JdbcPollingSource.run(JdbcPollingSource.java:25)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

I have to restart my cluster in order to rerun my job. Why is this happening? How can I submit my job again without having to restart the cluster?

The only addition to the Maven starter project is:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.24</version>
</dependency>

The Flink source does nothing but open a JDBC connection and is as follows:

package com.mycompany;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;

public class JdbcSource extends RichSourceFunction<Integer> {

    private final String connString;

    public JdbcSource(String connString) {
        this.connString = connString;
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        try (Connection conn = DriverManager.getConnection(this.connString)) {

        }
    }

    @Override
    public void cancel() {
    }
}

I have tested this on Flink version 1.14.0 and 1.13.2 with the same results.

Note that this question provides a solution of using Class.forName("org.postgresql.Driver"); within my RichSourceFunction. However I would like to know what is going on.

Upvotes: 6

Views: 1569

Answers (3)

Thesharing
Thesharing

Reputation: 356

According to the official documentation of PostgreSQL JDBC driver, if you are using Java 1.6+, you can just put the driver's jar file into the classpath. The driver will be loaded by the JVM automatically. So the question is how to place the driver's jar file into the classpath.

Since you are using docker to deploy a session cluster, there's two way that may works:

  1. Put the driver's jar file into docker image

Run and access the image with the command:

docker docker run -it -v $PWD:/tmp/flink <address to image> -- bash

Copy the driver's jar file into the folder /opt/flink/lib.

Create a new image from the container. Since /opt/flink/lib is loaded as classpath by default, now the driver's jar file is located at the classpath.

  1. Package the driver's jar into your user jar

Add maven-assembly-plugin to the pom.xml of your maven project. Recompile your project and get a jar file with dependencies. In this jar, the PostgreSQL JDBC driver is packaged together.

Upvotes: 0

Alter
Alter

Reputation: 1223

I have this pom.xml dependency for Postgres for Apache Flink 1.13:

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4-1201-jdbc41</version>
        </dependency>

you can have a Postgres connector class for example:

public class PostgreSQLConnector {
    private static volatile PostgreSQLConnector instance;
    private Connection connectionDB = null;

    public PostgreSQLConnector(your params) {
        ...
    }

    public static PostgreSQLConnector getInstance() {
        PostgreSQLConnector postgreSQLConnector = instance;
        if (postgreSQLConnector != null)
            return postgreSQLConnector;
        synchronized (PostgreSQLConnector.class) {
            if (instance == null) {
                instance = new PostgreSQLConnector(your params);
            }
            return instance;
        }
    }

    public Connection getConnectionDB() throws SQLException {
        if (checkNullConnection()) CreateConnection();
        return connectionDB;
    }

    public void CheckConnection() throws SQLException {
        if (checkNullConnection()) CreateConnection();
    }

    public void CreateConnection() throws SQLException {
            try {
                Class.forName(sink.driverName);
                connectionDB = DriverManager.getConnection(fullUrl, username, password);
            } catch (Exception e) {
                ...
            }
    }

public boolean checkNullConnection() throws SQLException {
        return (connectionDB == null || connectionDB.isClosed());
    }
}

then you can create a RichSourceFunction and create the connection in the overrides open method, not in the run

public class JdbcSource extends RichSourceFunction<Integer> {

    private final String connString;
    private static Connection dbConnection;
    private static final PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getInstance();

    public JdbcSource(String connString) {
        this.connString = connString;
    }

    @Override
    public void open(Configuration parameters) throws SQLException {
        dbConnection = postgreSQLConnector.getConnectionDB();
    }

    @Override
    public void close() throws Exception {
        if (dbConnection != null) dbConnection.close();
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
       do something here with the connection
    }

    @Override
    public void cancel() {
    }
}

Something like that you could maybe try and it should work

Upvotes: 0

DogeKing
DogeKing

Reputation: 64

The first question you can refer JDBC driver cannot be found when reading a DataSet from an SQL database in Apache Flink.

Second, if you use session mode. It can be easy to rerun the Flink job without restart the cluster. you can log in job manager shell then use the command rerun job.

Class.forName("org.postgresql.Driver"); will trigger static method block, so you DriverManager can get driver class. see:

// from org.postgresql.Driver
    static {
        try {
            register();
        } catch (SQLException var1) {
            throw new ExceptionInInitializerError(var1);
        }
    }

Upvotes: 0

Related Questions