当ParallelFlux中发生错误时,回滚所有的变化。

问题描述 投票:0回答:1

我正在使用spring webflux和reactor,我有以下的flux用于上传图片、调整图片大小和存储图片。对于每个尺寸,我都在一个自定义的执行服务上并行执行所述的flux。

任何方法 createDbAttachmentEntity, resizeAttachment, storeFile 可以抛出各种异常。

并行执行大小调整意味着任何参与的线程都可能抛出异常。这意味着我需要回滚所有的变化,以防某些地方没有正确执行。

例如,我可能有5个大小,但在DB中,系统只添加了4个,而5个是预期的。或者我可能在转换流时出错。或者我可能在存储系统中出现了错误。

如果出现异常,我希望能够回滚所有的更改(手动删除数据库条目和手动删除文件。

我如何才能做到这一点?

Flux.just(sizes)
        .parallel()
        .runOn(Schedulers.fromExecutor(executorService))
        .map(size -> createDbAttachmentEntity(size))
        .map(size_attachment -> resizeAttachment(size_attachment.getT1(), originalBytes))
        .map(size_attachment_bytes -> storeFile(...))
        .sequential()
        .collectList()
        .map(list -> {
            if(list.size() != sizes.length
                    || list.stream().anyMatch(objects -> objects.getT2().getId() == null)) {
                throw new RuntimeException();
            }
            return list;
        })
        .flux()
  here  .onErrorReturn(.......deleteEntities...........deleteFiles...........)     // problem: I do not have the files/entities
        .flatMap(list -> Flux.fromStream(list.stream()))
        .collectMap(Tuple2::getT1, Tuple2::getT2);

我想用这个方法来解决,但它不起作用

Flux.just(1, 2, 3, 4, 5, 6, 7)
            .map(integer -> {
                if (integer == 3) throw new RuntimeException("3");
                return integer;
            })
            .flatMap(integer -> Flux.just(integer)
                    .onErrorResume(t -> {
                        System.out.println("--onErrorResume" + integer); // here is what I need to pass in
                        return Flux.empty();
                    }))
java spring spring-webflux project-reactor reactor
1个回答
1
投票

如果我理解正确的要求,可以做一些类似于这样的事情。

自定义异常。

public class FluxEntitiesException extends RuntimeException {

    private Set<Entity> entities;
    public FluxEntitiesException() {
        super();
    }
    public FluxEntitiesException(String s) {
        super(s);
    }
    public FluxEntitiesException(String message, Throwable cause) {
        super(message, cause);
    }
    public FluxEntitiesException(Throwable cause) {
        super(cause);
    }

    public FluxEntitiesException(Set<Entity> entities) {
        super();
        this.entities = entities;
    }

    public Set<Entity> getEntities() {
        return entities;
    }

    static final long serialVersionUID = -1848914673093119416L;
}

示例处理器。

public void processError(FluxEntitiesException e){
    for(Entity entity: e.entities){
        //DO SOMETHING TO ROLL BACK
        System.out.println("Rolling back: " + entity);
    }
}

实体示例

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Entity {
    private Integer id;
    //WHATEVER ELSE
}

示例Repo方法。

public Entity addEntity(Entity entity) throws RuntimeException{
    if(entity.getId() >4)
        throw new RuntimeException("OH NO!");
    System.out.println("Added entity" + entity);
    return entity;
}

方法示例。

public void AddToDb(){
    Set<Entity> entities = Collections.synchronizedSet(new HashSet<>());

    Flux.just(1,2,3,4,5,6,7)
            //Parallel stuff isn't needed was just an example since you use it in original
            .parallel()
            .runOn(Schedulers.boundedElastic())
            .map(s -> addEntity(new Entity(s)))
            .doOnNext(entities::add)
            .sequential()
            .doOnError(e -> processError(new FluxEntitiesException(entities)))
            .collectList()
            .subscribe(s -> System.out.println(entities));
}
© www.soinside.com 2019 - 2024. All rights reserved.