AWS 胶水。如何为作业书签创建复合键?

问题描述 投票:0回答:2

我有一个带有表的 JDBC 源 (PostgreSQL),我想通过 Glue 获取该表。

我的表格有列:

id          (bigint)
name        (string)
updated_at  (timestamp)

我已经使用爬虫在 Glue 数据目录中设置了表,设置了作业并启用了作业书签。

当我运行作业时,它会自动通过新 id 定义新行。

但我想使用复合键 -> [ id + Updated_at ]。

它将允许我检测源表中的所有更新。

我该怎么做?

AWS 文档说此功能可用 (https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html):

For JDBC sources, the following rules apply:
   * For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.
   * You can specify the columns to use as bookmark keys. If you don't specify bookmark keys, AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).

我应该手动定义表(不使用爬虫)吗?

谢谢!

aws-glue
2个回答
0
投票

有关使用作业书签的 AWS 文档指出,您可以在 Glue 作业脚本中指定

jobBookmarkKeys
jobBookmarkKeysSortOrder

您可以通过以下方式指定 jobBookmarkKeys 和 jobBookmarkKeysSortOrder:

create_dynamic_frame.from_catalog — 使用additional_options。 create_dynamic_frame.from_options — 使用connection_options。

作为示例,当 Glue 作业中有 JDBC 源时,从生成的脚本开始,您可以添加以下行:

your_node = glueContext.create_dynamic_frame.from_options(
    connection_type="sqlserver",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "your.table",
        "connectionName": "Your Connection Name",
        # Add the following two lines:
        "jobBookmarkKeys": ["updated_at", "id"],
        "jobBookmarkKeysSortOrder": "asc",
    },
    transformation_ctx="your_node",
)

两个作业书签选项的类型可以在 AWS Glue 文档中找到。

jobBookmarkKeys
— 列名称数组。

jobBookmarkKeysSortOrder
— 定义如何根据排序顺序比较值的字符串。有效值:“asc”、“desc”。

注意,对于

jobBookmarkKeys
列名称的顺序很重要

如果您运行此 Glue 作业并在 Cloudwatch 中搜索执行程序日志,您将找到用于查找最新复合键的 SQL 查询。

jobBookmarkKeys
中的列顺序定义了此 SQL 语句中用于查找最新复合键的 ORDER BY 子句的顺序。

SELECT
    *
FROM (
    select TOP 1
        updated_at,
        id
    from
        your.table
    order by
        updated_at DESC,
        id DESC)

对于要在增量摄取中选取的行,复合键中的每一列都必须满足增量条件。在此示例中,行的 Updated_at id 都必须大于前一个书签的 Updated_at 和 id 值。以下是 Cloudwatch 日志中的 SQL 语句:

SELECT
    *
FROM (
    SELECT
        *
    FROM
        your.table
    WHERE ((updated_at > '2023-01-01 14:15:00.0')
        or(updated_at = '2023-01-01 14:15:00.0'
            AND id > '1234'))
    and((updated_at < '2023-01-01 14:30:00.0')
    or(updated_at = '2023-01-01 14:30:00.0'
        AND id <= '5678')))

-1
投票
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "hr", table_name = "emp",
    transformation_ctx = "datasource0",
    additional_options = {
        "jobBookmarkKeys": ["empno"],
        "jobBookmarkKeysSortOrder": "asc"
    }
)
© www.soinside.com 2019 - 2024. All rights reserved.