我有一个带有表的 JDBC 源 (PostgreSQL),我想通过 Glue 获取该表。
我的表格有列:
id (bigint)
name (string)
updated_at (timestamp)
我已经使用爬虫在 Glue 数据目录中设置了表,设置了作业并启用了作业书签。
当我运行作业时,它会自动通过新 id 定义新行。
但我想使用复合键 -> [ id + Updated_at ]。
它将允许我检测源表中的所有更新。
我该怎么做?
AWS 文档说此功能可用 (https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html):
For JDBC sources, the following rules apply:
* For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.
* You can specify the columns to use as bookmark keys. If you don't specify bookmark keys, AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).
我应该手动定义表(不使用爬虫)吗?
谢谢!
有关使用作业书签的 AWS 文档指出,您可以在 Glue 作业脚本中指定
jobBookmarkKeys
和 jobBookmarkKeysSortOrder
。
您可以通过以下方式指定 jobBookmarkKeys 和 jobBookmarkKeysSortOrder:
create_dynamic_frame.from_catalog — 使用additional_options。 create_dynamic_frame.from_options — 使用connection_options。
作为示例,当 Glue 作业中有 JDBC 源时,从生成的脚本开始,您可以添加以下行:
your_node = glueContext.create_dynamic_frame.from_options(
connection_type="sqlserver",
connection_options={
"useConnectionProperties": "true",
"dbtable": "your.table",
"connectionName": "Your Connection Name",
# Add the following two lines:
"jobBookmarkKeys": ["updated_at", "id"],
"jobBookmarkKeysSortOrder": "asc",
},
transformation_ctx="your_node",
)
两个作业书签选项的类型可以在 AWS Glue 文档中找到。
— 列名称数组。jobBookmarkKeys
— 定义如何根据排序顺序比较值的字符串。有效值:“asc”、“desc”。jobBookmarkKeysSortOrder
注意,对于
jobBookmarkKeys
,列名称的顺序很重要。
如果您运行此 Glue 作业并在 Cloudwatch 中搜索执行程序日志,您将找到用于查找最新复合键的 SQL 查询。
jobBookmarkKeys
中的列顺序定义了此 SQL 语句中用于查找最新复合键的 ORDER BY 子句的顺序。
SELECT
*
FROM (
select TOP 1
updated_at,
id
from
your.table
order by
updated_at DESC,
id DESC)
对于要在增量摄取中选取的行,复合键中的每一列都必须满足增量条件。在此示例中,行的 Updated_at 和 id 都必须大于前一个书签的 Updated_at 和 id 值。以下是 Cloudwatch 日志中的 SQL 语句:
SELECT
*
FROM (
SELECT
*
FROM
your.table
WHERE ((updated_at > '2023-01-01 14:15:00.0')
or(updated_at = '2023-01-01 14:15:00.0'
AND id > '1234'))
and((updated_at < '2023-01-01 14:30:00.0')
or(updated_at = '2023-01-01 14:30:00.0'
AND id <= '5678')))
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "hr", table_name = "emp",
transformation_ctx = "datasource0",
additional_options = {
"jobBookmarkKeys": ["empno"],
"jobBookmarkKeysSortOrder": "asc"
}
)