Reputation: 764
I'm trying to use a context manager to open an SSH tunnel forwarder when Metaflow FlowSpec subclass instances are run locally, but somehow instantiating a FlowSpec instance in the context manager calls the context manager twice causing the instantiation of the FlowSpec class to fail because the ssh tunnel is already in use. How can I prevent Metaflow from causing the context manager to get called twice?
Running the following flow fails because two attempts to open the ssh tunnel are made despite the context manager only getting called once.
from metaflow import FlowSpec
from metaflow import step
from my_pkg.library.databases import initialize_database
from my_pkg.library.ssh import SshTunnel
from my_pkg.settings.databases import DatabaseConfiguration
class TunnelingPipelineExample(FlowSpec):
@step
def start(self):
self.next(self.query_tables)
@step
def query_tables(self):
db, cursor = initialize_database()
cursor.execute("USE some_database;")
cursor.execute("SELECT * FROM some_table;")
for result in cursor.fetchall():
print(f"RESULT: {result}")
self.next(self.end)
@step
def end(self):
print(f"Completed pipeline!")
if __name__ == "__main__":
with SshTunnel(DatabaseConfiguration()):
TunnelingPipelineExample()
from my_pkg.library.databases import initialize_database
from my_pkg.library.ssh import SshTunnel
from my_pkg.settings.databases import ApplicationDatabaseUsa
if __name__ == "__main__":
with SshTunnel(DatabaseConfiguration()):
db, cursor = initialize_database()
cursor.execute("USE some_database;")
cursor.execute("SELECT * FROM some_table;")
for result in cursor.fetchall():
print(f"RESULT: {result}")
The same code succeeds without Metaflow, so I assume Metaflow is somehow causing the context manager to get called twice given the error message says the tunnel is already in use.
Upvotes: 1
Views: 94