java.io.BufferedReader()。map无法推断 fromStream(Stream extended T>)

问题描述 投票:0回答:1

方案:Spring WebFlux触发CommandLineRunner.run,以便将数据加载到MongoDb进行测试。

目标:在本地启动微服务时,其目的是读取json文件并将文档加载到MongDb。

个人知识:“ bufferedReader.lines()。filter(l->!l.trim()。isEmpty()”)读取每个json节点并将其作为流返回。然后,我可以将其映射到“ l”并访问我猜我不必创建列表然后流式传输,因为我已经通过“ new InputStreamReader(getClass()。getClassLoader()。getResourceAsStream()”将其作为流加载了,并且我假设我可以使用行(),因为它的节点将导致字符串行。我是朝正确的方向还是弄乱了一些主意?

这是一个json示例文件:

{
  "Extrato": {
    "description": "credit",
    "value": "R$1.000,00",
    "status": 11
  },
  "Extrato": {
    "description": "debit",  
    "value": "R$2.000,00",
    "status": 99
  }
}

型号

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Extrato {

    @Id
    private String id;
    private String description;
    private String value;
    private Integer status;

    public Extrato(String id, String description, String value, Integer status) {
        super();
        this.id = id;
        this.description = description;
        this.value = value;
        this.status = status;
    }
... getters and setter accordinly

存储库

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import com.noblockingcase.demo.model.Extrato;

import reactor.core.publisher.Flux;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

从上述json文件加载的命令

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import com.noblockingcase.demo.model.Extrato;
import com.noblockingcase.demo.repository.ExtratoRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

@Component
public class TestDataLoader implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(TestDataLoader.class);
    private ExtratoRepository extratoRepository;

    TestDataLoader(final ExtratoRepository extratoRepository) {
        this.extratoRepository = extratoRepository;
    }

    @Override
    public void run(final String... args) throws Exception {
        if (extratoRepository.count().block() == 0L) {
            final LongSupplier longSupplier = new LongSupplier() {
                Long l = 0L;

                @Override
                public long getAsLong() {
                    return l++;
                }
            };
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));

//*** THE ISSUE IS NEXT LINE
            Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
                    .map(l -> extratoRepository.save(new Extrato(String.valueOf(longSupplier.getAsLong()),
                            l.getDescription(), l.getValue(), l.getStatus()))))
                    .subscribe(m -> log.info("Carga Teste: {}", m.block()));

        }
    }

}

这里是MongoDb配置,尽管我认为这不相关

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.mongodb.MongoClientOptions;

@Configuration
public class MongoDbSettings {

    @Bean
    public MongoClientOptions mongoOptions() {
        return MongoClientOptions.builder().socketTimeout(2000).build();
    }

}

****根据尼古拉斯的建议编辑

我尝试过:

Flux.using(() -> new BufferedReader(
        new InputStreamReader(new ClassPathResource("carga-teste.json").getInputStream())).lines()
                .map(s -> {
                    String[] sa = s.split(" ");
                    return new Extrato(String.valueOf(longSupplier.getAsLong()), sa[0], sa[1],
                            Integer.getInteger(sa[2]));
                }),
        Flux::fromStream, BaseStream::close)
.subscribe(m -> log.info("Carga Teste: {}"));

并且我得到了java.lang.ArrayIndexOutOfBoundsException:索引1超出长度1的范围

整个日志:

2020-03-25 12:02:16.851  INFO 24328 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:48}] to 192.168.99.100:27017
2020-03-25 12:02:16.868  INFO 24328 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-03-25 12:02:16.872 ERROR 24328 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at com.noblockingcase.demo.DemoApplication.main(DemoApplication.java:12) ~[classes/:na]
Caused by: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
    at com.noblockingcase.demo.configuration.TestDataLoader.lambda$3(TestDataLoader.java:52) ~[classes/:na]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[na:na]
    at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300) ~[na:na]
    at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[na:na]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:129) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:71) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:102) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8186) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8350) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8157) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8084) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8027) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
    at com.noblockingcase.demo.configuration.TestDataLoader.run(TestDataLoader.java:56) ~[classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at com.noblockingcase.demo.DemoApplication.main(DemoApplication.java:12) ~[classes/:na]

2020-03-25 12:02:16.877  INFO 24328 --- [           main] org.mongodb.driver.connection            : Closed connection [connectionId{localValue:3, serverValue:48}] to 192.168.99.100:27017 because the pool has been closed.

如果我尝试了原始代码并对其进行了调整以读取文本文件,则可以成功读取文本文件而不是json。显然这不符合我的要求,因为我想读取json文件。顺便说一下,它可以澄清我被封锁的地方。

load-test.txt(在https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/demo/src/main/resources/carga-teste.txt中可用)

crédito de R$1.000,00
débito de R$100,00

使用简单文本文件的代码段

    BufferedReader bufferedReader = new BufferedReader(
            new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
    Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
            .map(l -> extratoRepository
                    .save(new Extrato(String.valueOf(longSupplier.getAsLong()), "Qualquer descrição", l))))
            .subscribe(m -> log.info("Carga Teste: {}", m.block()));

整个项目正在成功读取文本文件:https://github.com/jimisdrpc/webflux-worth-scenarious/tree/master/demo

用于启动MongoDb https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/docker-compose.yml的Docker组合

总而言之,我的问题是:在CommandLineRunner.run()期间,我不知道如何读取json文件并将数据插入到MongoDb中>

场景:Spring WebFlux触发CommandLineRunner.run,以便将数据加载到MongoDb进行测试。目标:在本地启动微服务时,其目的是读取json文件并加载...

mongodb spring-data java-stream spring-webflux java-io
1个回答
1
投票

我找到了Flux :: use的示例,它使用Flux :: fromStream对此有帮助。这会将您的文件读入Flux,然后您可以订阅并使用.flatmap或其他内容进行处理。来自Javadoc

© www.soinside.com 2019 - 2024. All rights reserved.