Logstash编解码器-Avro架构注册表:由于未定义的局部变量或方法'esponse',avro_schema_registry失败

问题描述 投票:0回答:1

我有一个Logstash conf,它以json格式从Kafka主题读取,它使用avro_schema_registry将输出序列化为avro。这是conf文件:

input {
  kafka{
    group_id => "test_group"
    topics => ["logs_json"]
    bootstrap_servers => "server2:9094, server1:9094, server3:9094"
    codec => "json"
    consumer_threads => 1
  }
}

output {
  kafka {
    codec => avro_schema_registry {
      endpoint => "http://host_schema_registry:port"
      schema_id  => 1
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
    bootstrap_servers => "server1:9094, server1:9094, server1:9094"
    topic_id => "logs_avro"
  }
}

但是我遇到这个错误:

org.jruby.exceptions.NameError: (NameError) undefined local variable or method `esponse' for #<SchemaRegistry::Client:0x3c5ad39>
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:127) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:101) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/clie
nt.rb:40) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.get_schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/g
ems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.encode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/
logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:246) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:219) ~[?:?]
        at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1800) ~[jruby-complete-9.2.8.0.jar:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:217) ~[?:?]
        at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:118) ~[logstash-core.jar:?]
        at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101) ~[logstash-core.jar:?]
        at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:262) ~[?:?]
[2020-02-11T13:11:41,720][ERROR][org.logstash.execution.WorkerLoop][main] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
ruby apache-kafka logstash jruby
1个回答
0
投票

该编解码器已损坏

推荐公开问题-https://github.com/wvanbergen/schema_registry/issues/5


没有理由将JSON序列化为Avro,然后仍然要插入Elasticsearch,因为Elasticsearch存储JSON,但是如果您确实想这样做,我建议改用Confluent的Elasticsearch Kafka Connector。

© www.soinside.com 2019 - 2024. All rights reserved.