节流 Elasticsearch 服务器似乎因批量发帖而不堪重负?

问题描述 投票:0回答:1
Rust 新手,正在转换一个 Python 应用程序,该应用程序需要数百个 MS Word 文档,解析它们的所有文本,并将每个 Word 文档的文本划分为 10 行 Lucene 文档(“LDocs”)的重叠序列,其中然后使用

_bulk

 终点添加到索引中。
Rust 提供的令人兴奋的机会是进行适当的并行性,我天真地认为我已经成功地做到了这一点:对于每个 Word 文档,在其自己的线程中,我以特定的 ES 批量字符串格式构建一个批量字符串,然后在Word文档结束,提交(此ES 8.6.2服务器在端口9500上运行):

reqwest_client .post("https://localhost:9500/my_index/_bulk") .header("Content-type", "application/x-ndjson") .body(self.bulk_post_str.to_string()) .basic_auth("mike12", Some("mike12")) .send() .await? .text() .await?;

...一切似乎都工作正常...直到我检查了所谓提交的“LDocs”数量以及使用 
_count

终点在流程结束后发现的数量:通常提交了 29294 个,但实际上只有 18638 个完成了进入索引!

所以我决定检查一下上面代码中那些 Rust 

?

到底发生了什么。典型的错误(在

... send().await
)是“发送 url 请求(https://localhost:9500/my_index/_bulk):调度任务已消失:运行时删除了调度任务”。
从我到目前为止所读到的内容来看,罪魁祸首似乎是 ES 服务器。但如何检测被拒绝的批量帖子呢?我检查了上面第二个 

await

的字符串输出,这是说

errors: false

我想做什么

所以我的想法是“如果批量发布失败,请尝试,尝试,再试一次”。所以我改成这样:
let mut n_loop = 0; loop { n_loop += 1; // corrected on the suggestion of kmdreko: // std::thread::sleep(std::time::Duration::from_millis(1)); let _ = tokio::time::sleep(std::time::Duration::from_millis(1)); if stop_detected() { return Ok(()) } let url = format!("https://localhost:9500/{}/_bulk", INDEX_NAME.read().unwrap()); let send_result = reqwest_client .post(url) .header("Content-type", "application/x-ndjson") .body(self.bulk_post_str.to_string()) .basic_auth("mike12", Some("mike12")) .send() .await; let send_value = match send_result { Ok(send_value) => send_value, Err(e) => { warn!("attempt {n_loop} SEND post_bulk_string error was {}", e); continue }, }; let text_result = send_value.text().await; let _text_value = match text_result { Ok(text_value) => text_value, Err(e) => { warn!("attempt {n_loop} TEXT post_bulk_string error was {}", e); continue }, }; // TODO check that "errors" here is "false" // info!(">> text_value |{}...|", &text_value[0..100]); break } Ok(())

...现在我发现我得到的结果非常不一致:有时 ES 服务器似乎完全出错了,我最多尝试 6 次发布批量字符串,并且得到的“LDocs”比应有的要少。

但是,大多数时候,我现在发现索引中获得的“LDocs”比应有的多:当我运行解析过程中计算的 29294 的

_count

终点时,为 43062。因此,我开始怀疑其中一些

send().await
命令是否实际上有效(部分?),尽管仍然引发了
Err
是否有推荐的方法来检测 ES 服务器的问题,然后在从多个线程发出 

_bulk

请求时实施某种限制机制?

和/或,就此而言,是否有办法提高 ES 服务器接受和处理传入批量帖子的能力?例如,目前我认为我只有一个“碎片”。我对 ES 和分片不太了解,因为我只在本地机器(OS = W10)上使用过 ES。 ==>“单节点”(?)。

调用上面的代码


let rt = Runtime::new().unwrap(); rt.block_on(async move { if stop_detected() { return } match docx_document.post_bulk_string(&reqwest_client_clone).await { Ok(()) => (), Err(e) => { warn!("post_bulk_string error was {}", e); () } } });

每个并行线程都会创建一个新的
Runtime

。我不知道还有什么其他选择,或者这样做可能会出现什么问题。

    

elasticsearch rust parallel-processing throttling bulk
1个回答
0
投票

这里的错误和问题很大程度上是由于我没有意识到

reqwests::Client

有一个阻塞版本,即

reqwests::blocking::Client
。我的代码绝对不需要担心异步处理、
await
和 Futures。解决方案是在 Cargo.toml 中的
blocking
中添加
reqwests
功能。
对于我的数百个Word文档,生成的“LDocs”(Lucene文档)的计算数字是29294。现在我使用非异步解决方案,当我在生成的ES索引上使用

_count

端点时,数字非常漂亮始终为 29564,这表明我在解析过程中可能需要调整计数方法。

假设过程正确结束,有时

_count

终点会提供比该值更高的数字,有时会提供更低的数字。目前我不太确定是否需要做更多的事情来确保 ES 服务器始终接受正确数量的 LDoc,不多也不少,并将继续监控事情。

    

© www.soinside.com 2019 - 2024. All rights reserved.