Anudeep Reddy
Anudeep Reddy

Reputation: 1

How to implement connection pooling for PostgreSQL in Google Cloud Dataflow

#Source Configuration

source_config = relational_db.SourceConfiguration(
             drivername=CONFIG['drivername'],
                     host=CONFIG['host'],
                     port=CONFIG['port'],
                     database=CONFIG['database'],
                     username=CONFIG['username'],
                     password=CONFIG['password']
                    )
# Target database table
customer_purchase_config = relational_db.TableConfiguration(
    name = 'customer_purchase',
    create_if_missing = False,
    primary_key_columns = ['purchaseId']
    )

with beam.Pipeline(options=options) as p:

    res = (
        p
        | "Read data from PubSub"
        >> beam.io.ReadFromPubSub(subscription=SUB).with_output_types(bytes)
        |'Transformation' >> (beam.ParDo(PubSubToDict()))
    )
    customer_purchase = res | beam.ParDo(Customer_Purchase())
    customer_purchase | 'Writing to customer_purchase' >> relational_db.Write(
        source_config=source_config,
        table_config=customer_purchase_config
        )

So when I am trying with these configuration I am able to insert and update data in PostgreSQL but when I receive huge spike in inputs then My connection limits are reaching and number of retries from worker nodes are increasing so is there any way to define the connection pool so that I can reuse connections.

Upvotes: 0

Views: 1753

Answers (2)

eatmeimadanish
eatmeimadanish

Reputation: 3907

I have an async python pooling option here that I use in production. It works extremely well.

Python Postgres psycopg2 ThreadedConnectionPool exhausted

Upvotes: 0

Chirag Shankar
Chirag Shankar

Reputation: 21

Check out this pull request example of how to manage connection pooling and batched writes in Dataflow - Python SDK to Cloud SQL.

The idea is to take advantage of the DoFn.Setup() method which runs once upon object creation and will create a static connection pool for each python interpreter. This pool will exist for the life of the object and will you to then open and close the sessions by renting and returning the connections back to the pool for reuse.

After some testing, the issue isn't the number of connections, but the number of new connections that are being created and closed every second. Each new connection would require a thread on the SQL server to be spun up in order to handle the transaction, this is an anti-pattern and leads to a lot of overhead. Assuming you are using Beam Nuggets, the library creates a new connection and disposes of it after every bundle. If you have small bundles, then it will be creating new connections way too quickly. This poses an issue on the SQL server side as each transaction to the same affected record is now handled by different threads, which can lead to row lock contention between the threads and cause performance bottle necks as stated in Cloud SQL documentation.

Something else to keep in mind is the number of keys and the size of the bundles. One of the factors that determines bundle sizes is the amount of data available for each key. When reading from Pub/Sub, this defaults to 1024 keys with the purpose of helping to filter duplicate messages. You can use a shuffle step, (GBK) GroupByKey to re-key into a set amount of keys, for example 40. This helps to increase bundle sizes as well. If correctly implemented, you should combine the same transactions for the same records into the same key and then batch that as one whole transaction to Cloud SQL. This allows Cloud SQL to optimize performance and reduces the thread overhead in locking rows.

My recommendation is to use a GroupByKey to have fewer than 1024 keys, which leads to larger bundle sizes and groups each transaction for the affected record together. You can then use the patch to batch to CSQL with the added benefit of handling connection pooling. This creates fewer new connections and thus less overhead in Cloud SQL.

Upvotes: 1

Related Questions