Reputation: 21
I'm loading data into an SQLite database via Luigi with the following code:
class LoadData(luigi.Task):
def requires(self):
return TransformData()
def run(self):
with sqlite3.connect('database.db') as db:
cursor = db.cursor()
cursor.execute("INSERT INTO prod SELECT * FROM staging;")
def output(self):
return luigi.LocalTarget('database.db')
This works, but when I want to update or insert new data, the task doesn't execute because Luigi considers it complete (database.db
already exists).
Maybe I didn't understand the good use of LocalTarget. What is the right way to approach this?
///EDIT: My question applies to the example given on this page (code for le_create_db.py
). How do you solve updates and inserts in that example?
///EDIT: This question about appending to a file is similar, but the solution using marker files does not work because sqla expects an SQLAlchemyTarget
output. Are there any other answers, specifically about appending to a database?
Upvotes: 2
Views: 517
Reputation: 188004
There are three options, in general:
complete
and always return False
)complete
means (e.g. decide on the recency last snapshot in the database, if a new one should be done)Here are all three variants side by side in a single snippet: https://gist.github.com/miku/fb53af7576bc3e5dd1df3ad627db4d07
Only once:
def output(self):
# (1) will run only once
return luigi.LocalTarget(dbfile)
Always:
def complete(self):
# (2) will run every time the task is called
return False
Custom logic:
def complete(self):
# (3) custom logic, e.g. backoff
# if the last snapshot happened recently
with sqlite3.connect(dbfile) as db:
cursor = db.cursor()
cursor.execute(
"""
select strftime('%s', 'now') - strftime('%s', t)
from dummy
order by t desc
limit 1
"""
)
row = cursor.fetchone()
if row is None:
return False
diff_s = row[0] # 2023-09-14 09:25:33
return int(diff_s) < 10
Upvotes: 0
Reputation: 3010
I had the same issue and was able to solve it by overriding the complete
method to simply return False
:
def complete(self):
return False
Now the task is re-run every time, even if database file is present.
Upvotes: 0
Reputation: 78
Consider using a mock file: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html
In each execution you will be creating a new file.
Another solution could be using the strategy of creating a marker table inside the db, for example: https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres.PostgresTarget
Upvotes: 0