Balu
Balu

Reputation: 476

Reading bulk data from a database using Apache Beam

I would like to know, how JdbcIO would execute a query in parallel if my query returns millions of rows. I have referred https://issues.apache.org/jira/browse/BEAM-2803 and the related pull requests. I couldn't understand it completely.

ReadAll expand method uses a ParDo. Hence would it create multiple connections to the database to read the data in parallel? If I restrict the number of connections that can be created to a DB in the datasource, will it stick to the connection limit?

Can anyone please help me to understand how this would handled in JdbcIO? I am using 2.2.0

Update :

.apply(
          ParDo.of(
              new ReadFn<>(
                  getDataSourceConfiguration(),
                  getQuery(),
                  getParameterSetter(),
                  getRowMapper())))

The above code shows that ReadFn is applied with a ParDo. I think, the ReadFn will run in parallel. If my assumption is correct, how would I use the readAll() method to read from a DB where I can establish only a limited number of connections at a time?

Thanks Balu

Upvotes: 3

Views: 3753

Answers (3)

Dima Zagreba
Dima Zagreba

Reputation: 1

I had similar task I got count of records from the database and split it into ranges of 1000 records Then I apply readAll to PCollection of ranges here is description of solution. And thanks Balu reg. datasource configuration.

Upvotes: 0

Balu
Balu

Reputation: 476

I created a Datasource, as follows.

    ComboPooledDataSource cpds = new ComboPooledDataSource();
    cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
    cpds.setJdbcUrl("jdbc:mysql://<IP>:3306/employees");
    cpds.setUser("root");
    cpds.setPassword("root");
    cpds.setMaxPoolSize(5);

There is a better way to set this driver now. I set the database pool size as 5. While doing JdbcIO transform, I used this datasource to create the connection. In the pipeline, I set

option.setMaxNumWorkers(5);
option.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);

I used a query which would return around 3 million records. While observing the DB connections , the number of connections were gradually increasing while the program was running. It used at most 5 connections on certain instances. I think, this is how we can limit the number of connections created to a DB while running JdbcIO trnsformation to load bulk amount data from a database.

Maven dependency for ComboPoolDataSource

    <dependency>
        <groupId>c3p0</groupId>
        <artifactId>c3p0</artifactId>
        <version>0.9.1.2</version>
    </dependency>

**please feel free to correct the answer if I missed something here.*

Upvotes: 1

Lara Schmidt
Lara Schmidt

Reputation: 309

The ReadAll method handles the case where you have many multiple queries. You can store the queries as a PCollection of strings where each string is the query. Then when reading, each item is processed as a separate query in a single ParDo.

This does not work well for small number of queries because it limits paralellism to the number of queries. But if you have many, then it will preform much faster. This is the case for most of the ReadAll calls.

From the code it looks like a connection is made per worker in the setup function. This might include several queries depending on the number of workers and number of queries.

Where is the query limit set? It should behave similarly with or without ReadAll.

See the jira for more information: https://issues.apache.org/jira/browse/BEAM-2706

I am not very familiar with jdbcIO, but it seems like they implemented the version suggested in jira. Where a PCollection can be of anything and then a callback to modify the query depending on the element in the PCollection. This allows each item in the PCollection to represent a query but is a bit more flexible then having a new query as each element.

Upvotes: 1

Related Questions