我正在尝试实现以下行为:
我编写了以下示例代码来实现上述行为:
private static final Integer EVENTS = 3; private static final Long SHORTER = 1L; private static final Long LONGER = 5L; private static final Long SLEEP = 100000L; public static void main(final String[] args) throws Exception { val files = new DualHashBidiMap<Integer, File>(); Observable.just(EVENTS) .flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList()))) .groupBy(num -> Math.abs(num % 2)) .repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS)) .map(group -> { val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt"))); group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true)); return file; }) .buffer(LONGER, TimeUnit.SECONDS) .flatMap(Observable::fromIterable) .distinct(File::getName) .doOnNext(files::removeValue) .doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8))) .subscribe(); Thread.sleep(SLEEP); }
虽然它按预期方式工作(暂时为地图访问预留了线程安全性问题,但出于演示目的,我正在使用
RX形式的功能,而无需依赖外部地图访问?commons-collections4
中的bidi-map),我想知道是否有一种方法可以实现上述功能pure
注意,在组创建后立即写入文件是关键,这意味着我们必须使文件处于已生成事件组范围之外的状态
提前感谢。
我正在尝试实现以下行为:定期轮询/生成事件流(持续时间短,例如1秒),然后根据某些内部特征对事件进行分组。每个组...
有趣的问题。我可能是错的,但我认为您不能避免在管道中某处出现Map
为Files
的情况。
我认为我的解决方案可以进一步清理,但似乎可以完成以下任务: