Gatlack
Gatlack

Reputation: 148

Can I use multiprocessing for querying different servers with sqlalchemy?

I have a few sql servers which I want to query in parallel. For this I tried to put the requests into processes, since it isn't one server I try to query multiple times but many which I query only once:

import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count

def get_df(engine):
    sql_string = "select * from sys.all_columns"
    df = pd.read_sql(sql=sql_string, con=engine)
    return df


def create_odbc_engine(server):
    db_odbc_string = "mssql+pyodbc://@{server}-db:9999/some_database?driver=ODBC+Driver+17+for+SQL+Server".format(
        server=server)
    return create_engine(db_odbc_string)


if __name__ == "__main__":
    servers = ["server1", "server2", "server3",...]
    args = [(create_odbc_engine(server),) for server in servers]
    n_processes = cpu_count() - 1
    with Pool(processes=n_processes) as pool:
        results = pool.map(get_df, args)
    

However I get pickle errors:

AttributeError: Can't pickle local object 'create_engine.<locals>.connect'

Is there any way I can do this in parallel?

Upvotes: 0

Views: 675

Answers (1)

Prithvi Raj
Prithvi Raj

Reputation: 1981

Python cannot pickle functions, so you cannot send the function create_odbc_engine in args. You can instead call this function in get_df.

import pandas as pd
from sqlalchemy import create_engine
from multiprocessing import Pool, cpu_count

def get_df(server):
    engine = (create_odbc_engine(server),)
    sql_string = "select * from sys.all_columns"
    df = pd.read_sql(sql=sql_string, con=engine)
    return df


def create_odbc_engine(server):
    db_odbc_string = "mssql+pyodbc://@{server}-db:9999/some_database?driver=ODBC+Driver+17+for+SQL+Server".format(
        server=server)
    return create_engine(db_odbc_string)


if __name__ == "__main__":
    servers = ["server1", "server2", "server3",...]
    # args = [(create_odbc_engine(server),) for server in servers]
    n_processes = cpu_count() - 1
    with Pool(processes=n_processes) as pool:
        results = pool.map(get_df, servers)

Upvotes: 1

Related Questions