使用RxJava将无限个分组事件流写入旋转文件中

问题描述 投票:1回答:1

我正在尝试实现以下行为:

  • 具有定期轮询/生成的事件流(持续时间短,例如1秒)
  • 然后根据某些内部特征对事件进行分组。
  • 每组事件都被写入匹配文件立即(这是我要维护的行为的关键)>>
  • 文件应在后续事件中重新用于匹配组(具有相同的密钥),直到文件被密封/旋转为止>>
  • 在较长的持续时间(例如5秒)内,文件被密封/旋转并使用其他订户进行操作
  • 我编写了以下示例代码来实现上述行为:


    private static final Integer EVENTS = 3;
    private static final Long SHORTER = 1L;
    private static final Long LONGER = 5L;
    private static final Long SLEEP = 100000L;

    public static void main(final String[] args) throws Exception {

        val files = new DualHashBidiMap<Integer, File>();

        Observable.just(EVENTS)
                .flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
                .groupBy(num -> Math.abs(num % 2))
                .repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
                .map(group -> {
                    val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
                    group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
                    return file;
                })
                .buffer(LONGER, TimeUnit.SECONDS)
                .flatMap(Observable::fromIterable)
                .distinct(File::getName)
                .doOnNext(files::removeValue)
                .doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
                .subscribe();
        Thread.sleep(SLEEP);
    }

虽然它按预期方式工作(暂时为地图访问预留了线程安全性问题,但出于演示目的,我正在使用commons-collections4中的bidi-map),我想知道是否有一种方法可以实现上述功能pure

RX形式的功能,而无需依赖外部地图访问?

注意,在组创建后立即写入文件是关键,这意味着我们必须使文件处于已生成事件组范围之外的状态

提前感谢。

我正在尝试实现以下行为:定期轮询/生成事件流(持续时间短,例如1秒),然后根据某些内部特征对事件进行分组。每个组...

java rxjs rx-java reactive-programming rx-java2
1个回答
1
投票

有趣的问题。我可能是错的,但我认为您不能避免在管道中某处出现MapFiles的情况。

我认为我的解决方案可以进一步清理,但似乎可以完成以下任务:

© www.soinside.com 2019 - 2024. All rights reserved.