Kiba ETL 和法拉第请求

问题描述 投票:0回答:1

我正在做一个有CDC概念的项目。它从数据库中读取更改并将事件推送到rabbitmq队列(它使用debezium)。

之后,我使用 KibaETL 处理从该队列消耗的事件消息。

KibaETL 有 3 个主要流程

  • 源(我从rabbitmq队列获取并且工作正常)
  • transform(我解析 json 消息,并尝试创建一个外部 API 来检索一些附加数据。
  • 目的地(我正在测试保存到本地文件)

奇怪的是。当我打电话给法拉第时,它默默地退出了 kiba etl 管道。它没有显示任何错误......

这是我的转换类的片段。

#frozen_string_literal: true

require 'faraday'
require 'faraday/net_http'
require 'json'
require 'kiba'

class RetrieveJobDataTransform
  def process(row)
    puts "\t[transform] Retrieve Job Information by UUID - job_uuid: #{row}"
    raise ArgumentError, 'row cannot be nil' if row.nil?

    job_uuid = row.dig(:after_data, 'job_uuid')
    
    api_result = make_api_call(job_uuid)

    transformed_data = transform_data(row, api_result)

    transformed_data
  end

  private

  def make_api_call(job_uuid)    
    
    conn = create_connection('https://url.com')    
    
    **# before conn.get call, it shows output properly**
    response = conn.get("api/jobs/#{job_uuid}")
    **# here it doesnt show any output when I was debugging**

    if response.success?
      JSON.parse(response.body)
    else
      nil
    end
  end

  def transform_data(row, api_result)
    row = api_result
  end

  def create_connection(api_url)
    Faraday.new(api_url) do |f|
      f.headers['x-api-key'] = 'api key'
      f.response :json 
      f.adapter :net_http 
      f.options.timeout = 120
    end
  end
end

以及被调用来处理ETL的主类

job = Kiba.parse do
  source KibaSource, bunny_adapter, queue_name

  transform ParseMessageTransform
  transform ValidateMessageTransform

  **#This is the class that silently quit with no error or warnings**
  transform RetrieveJobDataTransform 
  
  destination KibaDestination
end

puts '# job started - Running Kiba job... '
Kiba.run(job)

奇怪的行为是,如果我输入“irb”并添加该类代码并在不位于 kiba etl 管道内部的情况下调用它,它就不会出现错误。

有人可以给我任何建议吗?我陷入这个问题,不知道如何解决它。

提前致谢

干杯!

ruby etl kiba-etl
1个回答
0
投票

如果它实际上与 Kiba 相关(即使你可以在 IRB 中工作),我会感到相当惊讶,因为 Kiba 是如何工作的。但我很想排除这种可能性。

您是否愿意创建尽可能小的复制品(也许使用单文件 Bundler 配置https://bundler.io/guides/bundler_in_a_single_file_ruby_script.html),其中您可以“存根”源和目标?

可能会发生奇怪的事情,因为您正在为每一行重新创建法拉第连接。

我也可能会尝试使用另一个 HTTP 客户端,例如HTTParty 用于您正在拨打的电话。由于某种原因,您的调用和 RabbitMQ 调用之间可能会发生一些冲突!

很高兴为您提供进一步的帮助,如果您可以通过 gist.github.com 发布最小的重现,我一定能够提供帮助。

© www.soinside.com 2019 - 2024. All rights reserved.