ibuildstuff
ibuildstuff

Reputation: 21

Luigi/SQLite: How to update database after initial load?

I'm loading data into an SQLite database via Luigi with the following code:

class LoadData(luigi.Task):   
    
    def requires(self):
        return TransformData()
        
    def run(self):
        with sqlite3.connect('database.db') as db:
            cursor = db.cursor()
            cursor.execute("INSERT INTO prod SELECT * FROM staging;")
    def output(self):
        return luigi.LocalTarget('database.db')

This works, but when I want to update or insert new data, the task doesn't execute because Luigi considers it complete (database.db already exists).

Maybe I didn't understand the good use of LocalTarget. What is the right way to approach this?

///EDIT: My question applies to the example given on this page (code for le_create_db.py). How do you solve updates and inserts in that example?

///EDIT: This question about appending to a file is similar, but the solution using marker files does not work because sqla expects an SQLAlchemyTarget output. Are there any other answers, specifically about appending to a database?

Upvotes: 2

Views: 517

Answers (3)

miku
miku

Reputation: 188004

There are three options, in general:

  • update just once (only if db file does not exist)
  • update everytime (override complete and always return False)
  • customize what complete means (e.g. decide on the recency last snapshot in the database, if a new one should be done)

Here are all three variants side by side in a single snippet: https://gist.github.com/miku/fb53af7576bc3e5dd1df3ad627db4d07

Only once:

def output(self):
    # (1) will run only once
    return luigi.LocalTarget(dbfile)

Always:

def complete(self):
    # (2) will run every time the task is called
    return False

Custom logic:

def complete(self):
    # (3) custom logic, e.g. backoff
    # if the last snapshot happened recently
    with sqlite3.connect(dbfile) as db:
        cursor = db.cursor()
        cursor.execute(
            """
           select strftime('%s', 'now') - strftime('%s', t)
           from dummy
           order by t desc
           limit 1
            """
        )
        row = cursor.fetchone()
        if row is None:
            return False
        diff_s = row[0] # 2023-09-14 09:25:33

    return int(diff_s) < 10

Upvotes: 0

kchomski
kchomski

Reputation: 3010

I had the same issue and was able to solve it by overriding the complete method to simply return False:

def complete(self):
    return False

Now the task is re-run every time, even if database file is present.

Upvotes: 0

Jesus Sono
Jesus Sono

Reputation: 78

Consider using a mock file: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html

In each execution you will be creating a new file.

Another solution could be using the strategy of creating a marker table inside the db, for example: https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres.PostgresTarget

Upvotes: 0

Related Questions