_bulk
终点添加到索引中。Rust 提供的令人兴奋的机会是进行适当的并行性,我天真地认为我已经成功地做到了这一点:对于每个 Word 文档,在其自己的线程中,我以特定的 ES 批量字符串格式构建一个批量字符串,然后在Word文档结束,提交(此ES 8.6.2服务器在端口9500上运行):
reqwest_client
.post("https://localhost:9500/my_index/_bulk")
.header("Content-type", "application/x-ndjson")
.body(self.bulk_post_str.to_string())
.basic_auth("mike12", Some("mike12"))
.send()
.await?
.text()
.await?;
...一切似乎都工作正常...直到我检查了所谓提交的“LDocs”数量以及使用
_count
终点在流程结束后发现的数量:通常提交了 29294 个,但实际上只有 18638 个完成了进入索引!
所以我决定检查一下上面代码中那些 Rust?
到底发生了什么。典型的错误(在
... send().await
)是“发送 url 请求(https://localhost:9500/my_index/_bulk):调度任务已消失:运行时删除了调度任务”。从我到目前为止所读到的内容来看,罪魁祸首似乎是 ES 服务器。但如何检测被拒绝的批量帖子呢?我检查了上面第二个 await
的字符串输出,这是说
errors: false
。我想做什么
所以我的想法是“如果批量发布失败,请尝试,尝试,再试一次”。所以我改成这样:
let mut n_loop = 0;
loop {
n_loop += 1;
// corrected on the suggestion of kmdreko:
// std::thread::sleep(std::time::Duration::from_millis(1));
let _ = tokio::time::sleep(std::time::Duration::from_millis(1));
if stop_detected() {
return Ok(())
}
let url = format!("https://localhost:9500/{}/_bulk", INDEX_NAME.read().unwrap());
let send_result = reqwest_client
.post(url)
.header("Content-type", "application/x-ndjson")
.body(self.bulk_post_str.to_string())
.basic_auth("mike12", Some("mike12"))
.send()
.await;
let send_value = match send_result {
Ok(send_value) => send_value,
Err(e) => {
warn!("attempt {n_loop} SEND post_bulk_string error was {}", e);
continue
},
};
let text_result = send_value.text().await;
let _text_value = match text_result {
Ok(text_value) => text_value,
Err(e) => {
warn!("attempt {n_loop} TEXT post_bulk_string error was {}", e);
continue
},
};
// TODO check that "errors" here is "false"
// info!(">> text_value |{}...|", &text_value[0..100]);
break
}
Ok(())
...现在我发现我得到的结果非常不一致:有时 ES 服务器似乎完全出错了,我最多尝试 6 次发布批量字符串,并且得到的“LDocs”比应有的要少。
但是,大多数时候,我现在发现索引中获得的“LDocs”比应有的多:当我运行解析过程中计算的 29294 的
_count
终点时,为 43062。因此,我开始怀疑其中一些
send().await
命令是否实际上有效(部分?),尽管仍然引发了 Err
。是否有推荐的方法来检测 ES 服务器的问题,然后在从多个线程发出 _bulk
请求时实施某种限制机制?
和/或,就此而言,是否有办法提高 ES 服务器接受和处理传入批量帖子的能力?例如,目前我认为我只有一个“碎片”。我对 ES 和分片不太了解,因为我只在本地机器(OS = W10)上使用过 ES。 ==>“单节点”(?)。 调用上面的代码
let rt = Runtime::new().unwrap();
rt.block_on(async move {
if stop_detected() {
return
}
match docx_document.post_bulk_string(&reqwest_client_clone).await {
Ok(()) => (),
Err(e) => {
warn!("post_bulk_string error was {}", e);
()
}
}
});
每个并行线程都会创建一个新的
Runtime
。我不知道还有什么其他选择,或者这样做可能会出现什么问题。
这里的错误和问题很大程度上是由于我没有意识到
reqwests::Client
有一个阻塞版本,即
reqwests::blocking::Client
。我的代码绝对不需要担心异步处理、await
和 Futures。解决方案是在 Cargo.toml 中的 blocking
中添加 reqwests
功能。对于我的数百个Word文档,生成的“LDocs”(Lucene文档)的计算数字是29294。现在我使用非异步解决方案,当我在生成的ES索引上使用_count
端点时,数字非常漂亮始终为 29564,这表明我在解析过程中可能需要调整计数方法。
假设过程正确结束,有时_count
终点会提供比该值更高的数字,有时会提供更低的数字。目前我不太确定是否需要做更多的事情来确保 ES 服务器始终接受正确数量的 LDoc,不多也不少,并将继续监控事情。