Reputation: 61
I'm trying to create a Prefect task that receives as input an instance of PyMySQL connection, such as:
@task
def connect_db():
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
return connection
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
@task
def get_df(rows) -> Any:
return pd.DataFrame(rows, dtype=str)
@task
def save_csv(df):
path = 'mypath'
df.to_csv(path, sep=';', index=False)
with Flow(FLOW_NAME) as f:
con = connect_db()
rows = query_db(con)
df = get_df(rows)
save_csv(df)
However, as I try to register the resulting flow, it raises "TypeError: cannot pickle 'socket' object". Going through Prefect's Docs, I've found built-in MySQL Tasks ( https://docs.prefect.io/api/latest/tasks/mysql.html#mysqlexecute), but they open and close connections each time they're called. Is there any way to pass a connection previously opened to a Prefect Task (or implement such thing as a connection manager)?
Upvotes: 1
Views: 985
Reputation: 687
I tried to replicate your example but it registers fine. The most common way an error like this pops up is if you have a client in the global namespace that the flow uses. Prefect will try to serialize that upon registration. For example, the following code snippet will error if you try to register it:
import pymysql
connection = pymysql.connect(user=user,
password=password,
host=host,
port=port,
db=db,
connect_timeout=5,
cursorclass=pymysql.cursors.DictCursor,
local_infile=True)
@task
def query_db(connection) -> Any:
query = 'SELECT * FROM myschema.mytable;'
with connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
return rows
with Flow(FLOW_NAME) as f:
rows = query_db(connection)
This errors because the connection
variable is serialized along with the flow object. You can work around this by storing your Flow as a script. See this link for more information:
https://docs.prefect.io/core/idioms/script-based.html#using-script-based-flow-storage
This will avoid the serialization of the Flow object and create that connection during runtime.
If this happens during runtime
If you encounter this error during runtime, there are two possible reasons you can see this. The first is Dask serializing it, and the second is from the Prefect checkpointing.
Dask uses cloudpickle
to send the data to the workers across a network. So if you use Prefect with a DaskExecutor, it will use cloudpickle
to send the tasks for execution. Thus, task inputs and outputs need to be serializable. In this scenario, you should instantiate the Client and perform the query inside a task (like you saw with the current MySQL Task implementation)
If you use a LocalExecutor, task outputs are serialized by default because checkpointing is on by default. You can toggle with by doing checkpoint=False
when you define the task.
If you need further help, feel free to join the Prefect Slack channel at prefect.io/slack .
Upvotes: 3