Kafka Connect FilePulse 在 CSV 文件列上应用过滤并加载到 Kafka 主题中

问题描述 投票:0回答:0

我正在使用 FilePulse 连接器将 CSV 文件内容加载到 Kafka 主题中,并且工作正常。我的 CSV 文件内容示例如下;

`reference_key,service_type,event_timestamp,cell_id,site_id,serial_no,model_no,tac,current_date
1124545422342,VAT,1672804433000,28668,ULG2866,,,,20230104
1124545422342,VAT,1672804497000,28668,ULG2866,,,,20230104
1124545422342,VAT,1672802056000,,,357416315997720F,,35741,20230104
0342455432268,VAT,1672802123000,,,357464315455830F,,35746,20230104
0342455432268,VAT,1672802335000,14458,ULG1445,,,,20230104
9907033485252,WHO,1672873387000,,,,,,20230104
9907033485252,WHO,1672873387000,,,,,,20230104
0,TAX,1672873451000,,,,,,20230104
5456035928014,ADD,1672873451000,,,,,,20230104
5648136134038,ADD,1672873451000,,,,,,20230104`

现在我需要有关正确/有效语法的帮助,以使用 Kafka FilePulse Connect 库访问某些指定列的行值,并在将匹配的行记录发布到目标 Kafka 主题之前执行某些过滤(例如 DropFilterGrokFilter)。

我目前有以下配置,但 DropFilter 和 GrokFilter 不工作..

name=connect-file-pulse-quickstart-csv
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=connect-file-pulse-quickstart-csv
tasks.max=1

filters=ParseDelimitedRow,ParseGrokFilterRow,DropMatchedRow

# Delimited Row filter
filters.ParseDelimitedRow.extract.column.name=headers
filters.ParseDelimitedRow.trim.column=true
filters.ParseDelimitedRow.regex=\\|
filters.ParseDelimitedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter

# Grok Row filter
filters.ParseGrokFilterRow.type=io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter
#filters.ParseGrokFilterRow.pattern=(?i)vat|add
filters.ParseGrokFilterRow.source=message
filters.ParseGrokFilterRow.overwrite=message
filters.ParseGrokFilterRow.ignoreFailure=true

# Drop Row Filter
filters.DropMatchedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.DropMatchedRow.extract.column.name=headers
filters.DropMatchedRow.values={{ extract_array($.headers, 0) }}
#filters.DropMatchedRow.if={{ equals(extract_array($.headers, 0), 0) }}
filters.DropMatchedRow.invert=false

skip.headers=1
tasks.reader.class=io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader

# File Listing
##fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing
fs.listing.filters=io.streamthoughts.kafka.connect.filepulse.fs.filter.IgnoreHiddenFileListFilter,io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter
fs.listing.interval.ms=10000
fs.listing.directory.path=/my/target_directory
fs.listing.recursive.enabled=true
fs.listing.class=io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing

file.filter.regex.pattern=.*\\.csv$

fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy

# Internal Reporting
tasks.file.status.storage.bootstrap.servers=localhost:9092
tasks.file.status.storage.topic=connect-file-pulse-status

# Track file by name and hash
offset.attributes.string=name+hash

DropFilter 应用于行值为零的 reference_key 列,而 GrokFilter 应用于行值为 VAT 或 ADD 的 service_type 列,比较不区分大小写。 对此挑战的任何帮助/解决方案将不胜感激。

apache-kafka-connect
© www.soinside.com 2019 - 2024. All rights reserved.