rxread Java永不停止信息

问题描述 投票:1回答:1

我正在尝试解析一个csv文件并执行一个post调用,这是我到目前为止所实现的:

public static void main(String[] argc) {

    Vertx vertx = Vertx.vertx();

    WebClientOptions options = new WebClientOptions();
    WebClient client = WebClient.create(vertx, options);
    String uri = "/api/command";
    String host = "mydomain";
    vertx.fileSystem().rxReadFile("file.csv")
            .flattenAsObservable(fileContent -> Lists.newArrayList(fileContent.toString().split("\n")))
            .map(row -> row.split(";"))
            .skip(1)
            .map(row -> new JsonObject().put(name, row[0]))
            .flatMap(row -> {
                    createCmd.put("CMD","CMD")
                    return client.post(host,uri)
                                 .rxSendJson(createCmd)
                                 .map((HttpResponse<Buffer> r) -> {
                                     Long res = r.bodyAsJsonObject()
                                                      .getJsonObject("result")
                                                      .getJsonArray("hits")
                                                      .getJsonObject(0)
                                                      .getLong("res");
                                     return row.put("res", res);
                                     }).toObservable();
                    })
                    .doOnComplete(()  -> System.out.println("On Complete: Completed the operation"))
                    .doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
        .subscribe(
                content -> System.out.println("Content: " + content),
                err -> System.out.println("Cannot read the file: " + err.getMessage())
            );
    System.out.println("Out of the Vertx Read Stream");
}

为什么此代码从未到达最后一行?

    System.out.println("Out of the Vertx Read Stream");

我是否以某种方式阻塞了线程的执行,并且卡住了或者是正常行为?

编辑:

我也收到此警告

Apr 10, 2020 5:57:51 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-5,5,main]=Thread[vert.x-worker-thread-5,5,main] has been blocked for 75944 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked
java vert.x
1个回答
0
投票

只花了一些时间,什么是我的结果

import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.ext.web.client.WebClient;



public class Main extends AbstractVerticle {

@Override
public void start() throws Exception {

    Vertx vertx = Vertx.vertx();

    WebClientOptions options = new WebClientOptions();
    WebClient client = WebClient.create(vertx, options);
    String uri = "/api/command";
    String host = "mydomain";
    vertx.fileSystem().rxReadFile("file.csv")
            .flattenAsObservable(fileContent -> Lists.newArrayList(fileContent.toString().split("\n"))) //The method flattenAsObservable((<no type> fileContent) -> {}) is undefined for the type Single<Buffer>
            .map(row -> row.split(";"))
            .skip(1)
            .map(row -> new JsonObject().put(name, row[0])) // what is name here?
            .flatMap(row -> {
                    createCmd.put("CMD","CMD")
                    return client.post(host,uri)
                                 .rxSendJson(createCmd)
                                 .map((HttpResponse<Buffer> r) -> {
                                     Long res = r.bodyAsJsonObject()
                                                      .getJsonObject("result")
                                                      .getJsonArray("hits")
                                                      .getJsonObject(0)
                                                      .getLong("res");
                                     return row.put("res", res);
                                     }).toObservable();
                    })
                    .doOnComplete(()  -> System.out.println("On Complete: Completed the operation"))
                    .doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
        .subscribe(
                content -> System.out.println("Content: " + content),
                err -> System.out.println("Cannot read the file: " + err.getMessage())
            );


      System.out.println("Out of the Vertx Read Stream");
     }
}
© www.soinside.com 2019 - 2024. All rights reserved.