我正在尝试解析一个csv文件并执行一个post调用,这是我到目前为止所实现的:
public static void main(String[] argc) {
Vertx vertx = Vertx.vertx();
WebClientOptions options = new WebClientOptions();
WebClient client = WebClient.create(vertx, options);
String uri = "/api/command";
String host = "mydomain";
vertx.fileSystem().rxReadFile("file.csv")
.flattenAsObservable(fileContent -> Lists.newArrayList(fileContent.toString().split("\n")))
.map(row -> row.split(";"))
.skip(1)
.map(row -> new JsonObject().put(name, row[0]))
.flatMap(row -> {
createCmd.put("CMD","CMD")
return client.post(host,uri)
.rxSendJson(createCmd)
.map((HttpResponse<Buffer> r) -> {
Long res = r.bodyAsJsonObject()
.getJsonObject("result")
.getJsonArray("hits")
.getJsonObject(0)
.getLong("res");
return row.put("res", res);
}).toObservable();
})
.doOnComplete(() -> System.out.println("On Complete: Completed the operation"))
.doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
.subscribe(
content -> System.out.println("Content: " + content),
err -> System.out.println("Cannot read the file: " + err.getMessage())
);
System.out.println("Out of the Vertx Read Stream");
}
为什么此代码从未到达最后一行?
System.out.println("Out of the Vertx Read Stream");
我是否以某种方式阻塞了线程的执行,并且卡住了或者是正常行为?
编辑:
我也收到此警告
Apr 10, 2020 5:57:51 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-5,5,main]=Thread[vert.x-worker-thread-5,5,main] has been blocked for 75944 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
只花了一些时间,什么是我的结果
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.ext.web.client.WebClient;
public class Main extends AbstractVerticle {
@Override
public void start() throws Exception {
Vertx vertx = Vertx.vertx();
WebClientOptions options = new WebClientOptions();
WebClient client = WebClient.create(vertx, options);
String uri = "/api/command";
String host = "mydomain";
vertx.fileSystem().rxReadFile("file.csv")
.flattenAsObservable(fileContent -> Lists.newArrayList(fileContent.toString().split("\n"))) //The method flattenAsObservable((<no type> fileContent) -> {}) is undefined for the type Single<Buffer>
.map(row -> row.split(";"))
.skip(1)
.map(row -> new JsonObject().put(name, row[0])) // what is name here?
.flatMap(row -> {
createCmd.put("CMD","CMD")
return client.post(host,uri)
.rxSendJson(createCmd)
.map((HttpResponse<Buffer> r) -> {
Long res = r.bodyAsJsonObject()
.getJsonObject("result")
.getJsonArray("hits")
.getJsonObject(0)
.getLong("res");
return row.put("res", res);
}).toObservable();
})
.doOnComplete(() -> System.out.println("On Complete: Completed the operation"))
.doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
.subscribe(
content -> System.out.println("Content: " + content),
err -> System.out.println("Cannot read the file: " + err.getMessage())
);
System.out.println("Out of the Vertx Read Stream");
}
}