我正在做一个有CDC概念的项目。它从数据库中读取更改并将事件推送到rabbitmq队列(它使用debezium)。
之后,我使用 KibaETL 处理从该队列消耗的事件消息。
KibaETL 有 3 个主要流程
奇怪的是。当我打电话给法拉第时,它默默地退出了 kiba etl 管道。它没有显示任何错误......
这是我的转换类的片段。
#frozen_string_literal: true
require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'
class RetrieveJobDataTransform
def process(row)
puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
raise ArgumentError, 'row cannot be nil' if row.nil?
job_uuid = row.dig(:after_data, 'job_uuid')
api_result = make_api_call(job_uuid)
transformed_data = transform_data(row, api_result)
transformed_data
end
private
def make_api_call(job_uuid)
conn = create_connection('https://url.com')
**# before conn.get call, it shows output properly**
response = conn.get("api/jobs/#{job_uuid}")
**# here it doesnt show any output when I was debugging**
if response.success?
JSON.parse(response.body)
else
nil
end
end
def transform_data(row, api_result)
row = api_result
end
def create_connection(api_url)
Faraday.new(api_url) do |f|
f.headers['x-api-key'] = 'api key'
f.response :json
f.adapter :net_http
f.options.timeout = 120
end
end
end
以及被调用来处理ETL的主类
job = Kiba.parse do
source KibaSource, bunny_adapter, queue_name
transform ParseMessageTransform
transform ValidateMessageTransform
**#This is the class that silently quit with no error or warnings**
transform RetrieveJobDataTransform
destination KibaDestination
end
puts '# job started - Running Kiba job... '
Kiba.run(job)
奇怪的行为是,如果我输入“irb”并添加该类代码并在不位于 kiba etl 管道内部的情况下调用它,它就不会出现错误。
有人可以给我任何建议吗?我陷入这个问题,不知道如何解决它。
提前致谢
干杯!
如果它实际上与 Kiba 相关(即使你可以在 IRB 中工作),我会感到相当惊讶,因为 Kiba 是如何工作的。但我很想排除这种可能性。
您是否愿意创建尽可能小的复制品(也许使用单文件 Bundler 配置https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html),其中您可以“存根”源和目标?
可能会发生奇怪的事情,因为您正在为每一行重新创建法拉第连接。
我也可能会尝试使用另一个 HTTP 客户端,例如HTTParty 用于您正在拨打的电话。由于某种原因,您的调用和 RabbitMQ 调用之间可能会发生一些冲突!
很高兴为您提供进一步的帮助,如果您可以通过 gist.github.com 发布最小的重现,我一定能够提供帮助。