弹性搜索摄取附件插件块

问题描述 投票:0回答:1

我正在使用NEST(C#)和ingest attachment plugin将数千个文档中的10个文档摄取到Elastic搜索实例中。不幸的是,过了一段时间,一切都停滞不前 - 即没有更多的文件被摄取。日志显示:

[2019-02-20T17:35:07,528][INFO ][o.e.m.j.JvmGcMonitorService] [BwAAiDl] [gc][7412] overhead, spent [326ms] collecting in the last [1s]

不确定这是否告诉任何人什么?顺便说一句,是否有更有效的方法来摄取许多文档(而不是使用数千个REST请求)?

我正在使用这种代码:

client.Index(new Document
{
    Id = Guid.NewGuid(),
    Path = somePath,
    Content = Convert.ToBase64String(File.ReadAllBytes(somePath))
}, i => i.Pipeline("attachments"));

定义管道:

client.PutPipeline("attachments", p => p
    .Description("Document attachment pipeline")
    .Processors(pr => pr
        .Attachment<Document>(a => a
        .Field(f => f.Content)
        .TargetField(f => f.Attachment)
        )
        .Remove<Document>(r => r
        .Field(f => f.Content)
        )
    )
);
c# elasticsearch nest elastic-stack
1个回答
1
投票

日志表明在Elasticsearch服务器端执行垃圾收集花费了相当多的时间;这很可能是您所看到的大型停止事件的原因。如果您在群集上启用了监控(理想情况下将此类数据导出到单独的群集中),我会查看分析这些数据,看看它是否说明了为什么会发生大型GC。

是否有更有效的方法来摄取许多文档(而不是使用数千个REST请求)?

是的,您要在单独的索引请求中为每个附件编制索引。根据每个附件的大小,base64编码,您可能希望发送多个批量请求

// Your collection of documents
var documents = new[]
{
    new Document
    {
        Id = Guid.NewGuid(),
        Path = "path",
        Content = "content"
    },
    new Document
    {
        Id = Guid.NewGuid(),
        Path = "path",
        Content = "content" // base64 encoded bytes
    }
};

var client = new ElasticClient();

var bulkResponse = client.Bulk(b => b
    .Pipeline("attachments")
    .IndexMany(documents)
);

如果您正在从文件系统中读取文档,则可能需要懒惰地枚举它们并发送批量请求。在这里,您也可以使用BulkAll辅助方法。

首先要有一些懒惰的枚举文件集

public static IEnumerable<Document> GetDocuments()
{
    var count = 0;
    while (count++ < 20)
    {
        yield return new Document
        {
            Id = Guid.NewGuid(),
            Path = "path",
            Content = "content" // base64 encoded bytes
        };
    }
}

然后配置BulkAll调用

var client = new ElasticClient();

// set up the observable configuration
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
    .Pipeline("attachments")
    .Size(10)
);

var waitHandle = new ManualResetEvent(false);

Exception exception = null;

// set up what to do in response to next bulk call, exception and completion
var bulkAllObserver = new BulkAllObserver(
    onNext: response => 
    {
        // perform some action e.g. incrementing counter
        // to indicate how many have been indexed
    },
    onError: e =>
    {
        exception = e;
        waitHandle.Set();
    },
    onCompleted: () =>
    {
        waitHandle.Set();
    });

// start the observable process
bulkAllObservable.Subscribe(bulkAllObserver);

// wait for indexing to finish, either forever,
// or set a max timeout as here.
waitHandle.WaitOne(TimeSpan.FromHours(1));

if (exception != null)
    throw exception;

大小决定了每个请求中要发送的文档数量。对于集群的规模有多大没有硬性规定,因为它可能取决于许多因素,包括摄取管道,文档映射,文档字节大小,集群硬件等。您可以配置observable重试无法索引的文档,如果你看到es_rejected_execution_exception,你就可以达到你的集群可以同时处理的极限。

另一个建议是文档ID。我看到你正在使用新的Guids来处理文档,这对我来说意味着你并不关心每个文档的价值。如果是这种情况,我建议不要发送Id值,而是允许Elasticsearch为每个文档生成一个id。这很可能导致improvement in performance(我相信自从这篇文章以来,Elasticsearch和Lucene的实现已经略有改变,但重点仍然存在)。

© www.soinside.com 2019 - 2024. All rights reserved.