Luigi/SQLite:如何在初始加载后更新数据库?

问题描述 投票:0回答:3

我使用以下代码通过 Luigi 将数据加载到 SQLite 数据库中:

class LoadData(luigi.Task):   
    
    def requires(self):
        return TransformData()
        
    def run(self):
        with sqlite3.connect('database.db') as db:
            cursor = db.cursor()
            cursor.execute("INSERT INTO prod SELECT * FROM staging;")
    def output(self):
        return luigi.LocalTarget('database.db')

这可行,但是当我想更新或插入新数据时,任务不会执行,因为 Luigi 认为它已完成(

database.db
已经存在)

可能是我没有理解LocalTarget的好用。解决这个问题的正确方法是什么?

///编辑:我的问题适用于此页面上给出的示例(

le_create_db.py
的代码)。您如何解决该示例中的更新和插入问题?

///编辑:关于附加到文件的这个问题类似,但是使用标记文件的解决方案不起作用,因为 sqla 需要

SQLAlchemyTarget
输出。还有其他答案吗,特别是关于附加到数据库的答案吗?

python sqlite etl luigi
3个回答
0
投票

考虑使用模拟文件: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html

在每次执行中,您将创建一个新文件。

另一种解决方案可以使用在数据库内创建标记表的策略,例如: https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres .PostgresTarget


0
投票

我遇到了同样的问题,并且能够通过重写

complete
方法来简单地返回
False
:

来解决它
def complete(self):
    return False

现在,即使存在数据库文件,任务也会重新运行。


0
投票

一般有以下三种选择:

  • 仅更新一次(仅当数据库文件不存在时)
  • 每次更新(覆盖
    complete
    并始终返回
    False
  • 自定义
    complete
    的含义(例如,决定数据库中最近的最后一个快照,如果需要创建新快照)

以下是单个片段中并排的所有三个变体:https://gist.github.com/miku/fb53af7576bc3e5dd1df3ad627db4d07

© www.soinside.com 2019 - 2024. All rights reserved.