带 Reactor 的非阻塞 ReentrantLock

问题描述 投票:0回答:3

我需要限制同时处理相同资源的客户端数量
所以我尝试实现模拟

lock.lock();
try {
     do work
} finally {
    lock.unlock();
}

但以非阻塞方式使用 Reactor 库。 我有这样的东西。

但我有一个问题:
有没有更好的方法来做到这一点
或者也许有人知道已实施的解决方案
或者这不是在反应世界中应该如何完成的,并且还有另一种方法可以解决此类问题?

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class NonblockingLock {
    private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);

    private String currentOwner;
    private final AtomicInteger lockCounter = new AtomicInteger();
    private final FluxSink<Boolean> notifierSink;
    private final Flux<Boolean> notifier;
    private final String resourceId;

    public NonblockingLock(String resourceId) {
        this.resourceId = resourceId;
        EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
        notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
        notifier = processor.startWith(true);
    }

    /**
     * Nonblocking version of
     * <pre><code>
     *     lock.lock();
     *     try {
     *         do work
     *     } finally {
     *         lock.unlock();
     *     }
     * </code></pre>
     * */
    public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
        Objects.requireNonNull(owner, "owner");
        return notifier.filter(it -> tryAcquire(owner))
                .next()
                .transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
                .doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
                .doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
                .flatMapMany(it -> work)
                .doFinally(s -> {
                    if (tryRelease(owner)) {
                        LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
                        notifierSink.next(true);
                    }
                });
    }

    private boolean tryAcquire(String owner) {
        boolean acquired;
        synchronized (this) {
            if (currentOwner == null) {
                currentOwner = owner;
            }
            acquired = currentOwner.equals(owner);
            if (acquired) {
                lockCounter.incrementAndGet();
            }
        }
        return acquired;
    }

    private boolean tryRelease(String owner) {
        boolean released = false;
        synchronized (this) {
            if (currentOwner.equals(owner)) {
                int count = lockCounter.decrementAndGet();
                if (count == 0) {
                    currentOwner = null;
                    released = true;
                }
            }
        }
        return released;
    }
}

这就是我认为它应该工作的方式

@Test
public void processWithLock() throws Exception {
    NonblockingLock lock = new NonblockingLock("work");
    String client1 = "client1";
    String client2 = "client2";
    Flux<String> requests = getWork(client1, lock)
            //emulate async request for resource by another client
            .mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
            //emulate async request for resource by the same client
            .mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
    StepVerifier.create(requests)
            .expectSubscription()
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client2)
            .expectNext(client2)
            .expectNext(client2)
            .expectComplete()
            .verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
    return lock.processWithLock(client, null,
            Flux.interval(Duration.ofMillis(300))
                    .take(3)
                    .map(i -> client)
                    .log(client)
    );
}
java project-reactor
3个回答
4
投票

既然Reactor引入了

Sinks
,实现这样的锁就更简单了。我写了一个库,你可以用它编写这样的代码:

import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;

Flux<String> getWork(String client, Duration delay, Lock lock) {
    return Mono.delay(delay)
              .flatMapMany(l -> lock.withLock(() ->
                  Flux.interval(Duration.ofMillis(300))
                      .take(3)
                      .map(i -> client)
                      .log(client)));
}

它在内部使用一个

Sinks.Empty
队列来跟踪锁请求。在每次解锁时,它只是从队列中轮询并向
Mono
发送一个
ON_COMPLETE
信号,这可能比使用
Sinks.many().multicast()
向所有请求者广播要好一些。它利用了
Sinks.Empty
不能被多次发射的特性,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止
ON_COMPLETE
的发射,反之亦然.

并且通过将

Flux.using
包裹在锁上,可以确保在所有情况下都能正确解锁锁,例如
try-finally
.

如果您有兴趣,这里是实现的一部分。原来的答案是

synchronized
可能会在竞争条件下阻塞,下面用CAS操作重写,使锁是非阻塞的。 (在图书馆,现在所有的锁都是用CAS操作实现的。)

    private volatile int count = 0;  // 0 if unlocked

    public LockHandle tryLock() {
        if (COUNT.compareAndSet(this, 0, 1)) {
            // Optimistic acquiring
            return LockHandle.empty();
        } else {
            LockHandle handle = SinkUtils.queueSink(queue);
            fairDecrement(false);
            return handle;
        }
    }
    public void unlock() {
        if (fairness) {
            fairDecrement(true);
        } else {
            COUNT.set(this, 0);
            fairDecrement(false);
        }
    }
    /*
     * If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
     * If "unlocking", we jump directly to the decrementing.
     */
    private void fairDecrement(boolean unlocking) {
        /*
         * COUNT states:
         * - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
         * - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
         *               Note that the two are mutual exclusive, since they both require COUNT++ == 0.
         *
         * If "unlocking", then we are responsible for decrements.
         *
         * Otherwise,
         * 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
         *    decrement operation. Either way, some thread will eventually emit to pending requests.
         *    We increment COUNT to signal to the emitter that the queue could have potentially been
         *    appended to after its last emission.
         * 2. If COUNT++ == 0, then we are responsible for decrementing.
         */
        if (unlocking || COUNT.incrementAndGet(this) == 1) {
            do {
                if (SinkUtils.emitAnySink(queue)) {
                    /*
                     * Leaves the decrementing job to the next lock holder, who will unlock somehow.
                     */
                    return;
                }
                /*
                 * It is now safe to decrement COUNT, since there is no concurrent decrements.
                 */
            } while (COUNT.decrementAndGet(this) != 0);
        }
    }

另外,如果你想限制客户端数量为N而不是一个,库提供了

ReactiveSemaphore
,对应于
java.util.concurrent.Semaphore


2
投票

我有一个使用相同参数的远程服务独占调用的解决方案。也许对你的情况有帮助。

它基于立即

tryLock
如果资源繁忙和
Mono.retryWhen
“等待”释放。

所以我有

LockData
锁的元数据类

public final class LockData {
    // Lock key to identify same operation (same cache key, for example).
    private final String key;
    // Unique identifier for equals and hashCode.
    private final String uuid;
    // Date and time of the acquiring for lock duration limiting.
    private final OffsetDateTime acquiredDateTime;
    ...
}

LockCommand
接口是 LockData 上阻塞操作的抽象

public interface LockCommand {

    Tuple2<Boolean, LockData> tryLock(LockData lockData);

    void unlock(LockData lockData);
    ...
}

UnlockEventsRegistry
接口是解锁事件监听器收集器的抽象。

public interface UnlockEventsRegistry {
    // initialize event listeners collection when acquire lock
    Mono<Void> add(LockData lockData);

    // notify event listeners and remove collection when release lock
    Mono<Void> remove(LockData lockData);

    // register event listener for given lockData
    Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
}

Lock
类可以用锁包装源Mono,解锁和用解锁包装CacheMono writer。

public final class Lock {
    private final LockCommand lockCommand;
    private final LockData lockData;
    private final UnlockEventsRegistry unlockEventsRegistry;
    private final EmitterProcessor<Integer> unlockEvents;
    private final FluxSink<Integer> unlockEventSink;

    public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
        this.lockCommand = lockCommand;
        this.lockData = LockData.builder()
                .key(key)
                .uuid(UUID.randomUUID().toString())
                .build();
        this.unlockEventsRegistry = unlockEventsRegistry;
        this.unlockEvents = EmitterProcessor.create(false);
        this.unlockEventSink = unlockEvents.sink();
    }

    ...

    public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
        return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
                .subscribeOn(scheduler)
                .flatMap(isLocked -> {
                    if (isLocked.getT1()) {
                        return unlockEventsRegistry.add(lockData)
                                .then(source
                                        .switchIfEmpty(unlock().then(Mono.empty()))
                                        .onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
                    } else {
                        return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
                    }
                });
    }

    public Mono<Void> unlock(Scheduler scheduler) {
        return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
                .then(unlockEventsRegistry.remove(lockData))
                .subscribeOn(scheduler);
    }

    public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
            BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
        Objects.requireNonNull(cacheWriter);
        return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
                .onErrorResume(throwable -> unlock()));
    }

    public final <T> UnaryOperator<Mono<T>> retryTransformer() {
        return mono -> mono
                .doOnError(LockIsNotAvailableException.class,
                        error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
                                .doOnNext(registered -> {
                                    if (!registered) unlockEventSink.next(0);
                                })
                                .then(Mono.just(2).map(unlockEventSink::next)
                                        .delaySubscription(lockCommand.getMaxLockDuration()))
                                .subscribe())
                .doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
                        ignored -> unlockEventSink.next(0))
                .retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
                    if (error instanceof LockIsNotAvailableException) return integer;
                    else throw Exceptions.propagate(error);
                }));
    }
}

现在如果我必须用 CacheMono 包装我的 Mono 并锁定,我可以这样做:

private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
    Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);

    return CacheMono.lookup(CACHE_READER, cacheKey)
            // Lock and double check
            .onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
            .andWriteWith(lock.unlockAfterCacheWriter(CACHE_WRITER))
            // Retry if lock is not available
            .transform(lock.retryTransformer());
}

您可以在 GitHub 上找到带有示例的代码和测试


1
投票

我知道这已经有几个合理的答案,但我认为有一个(主观上)更简单的解决方案可以利用

flatMap
(在类似信号量的用例中)或
concatMap
(在
lock
/
synchronized 
用例)来控制并行化。

本方案仅使用Sinks和Reactor算子来支持加锁。未订阅的发布者也不会使用锁。

public class ReactiveSemaphore {

    /**
     * This can be thought of as a queue of lock handles. The first argument of the tuple is a signaler that accepts a value
     * value when a lock is available. The second argument is a Mono that completes when the lock is released.
     */
    private final Sinks.Many<Tuple2<Sinks.One<Boolean>, Mono<Boolean>>> taskQueue;
    private final Sinks.One<Boolean> close = Sinks.one();

    /**
     * Creates a ReactiveSemaphore that only allows one Publisher to be subscribed at a time. Executed by order
     * of subscription.
     */
    public ReactiveSemaphore() {
        this(1);
    }

    /**
     * Creates a ReactiveSemaphore that allows up to poolSize Publishers to be subscribed in parallel.
     * @param poolSize The number of allowed subscriptions to run in parallel.
     */
    public ReactiveSemaphore(int poolSize) {
        taskQueue = Sinks.many().unicast().onBackpressureBuffer();

        Flux<Boolean> tasks;
        if (poolSize <= 1)
            // We could use flatMap with parallelism of 1, but that seems weird
            tasks = taskQueue
                    .asFlux()
                    .concatMap(ReactiveSemaphore::dispatchTask);
        else {
            tasks = taskQueue
                    .asFlux()
                    .flatMap(ReactiveSemaphore::dispatchTask, poolSize);
        }

        tasks
                .takeUntilOther(close.asMono())
                .subscribe();
    }

    private static Mono<Boolean> dispatchTask(Tuple2<Sinks.One<Boolean>, Mono<Boolean>> task) {
        task.getT1().tryEmitValue(true); // signal that lock is available and consume lock
        return task.getT2(); // return Mono that completes when lock is released
    }

    @PreDestroy
    private void cleanup() {
        close.tryEmitValue(true);
    }

    public <T> Publisher<T> lock(Publisher<T> publisher) {
        return Flux.defer(() -> this.waitForNext(publisher));
    }

    public <T> Mono<T> lock(Mono<T> publisher) {
        return Mono.defer(() -> this.waitForNext(publisher).next());
    }

    public <T> Flux<T> lock(Flux<T> publisher) {
        return Flux.defer(() -> this.waitForNext(publisher));
    }

    /**
     * Waits for an available lock in the taskQueue. When ReactiveSemaphore is ready, a lock will be allocated for the task
     * and will not be released until the provided task errors or completes. For this reason this operation should
     * only be performed on a hot publisher (a publisher that has been subscribed to). Therefore, this method should
     * always be wrapped inside a call to {@link Flux#defer(Supplier)} or {@link Mono#defer(Supplier)}.
     * @param task The task to execute once the ReactiveSemaphore has an available lock.
     * @return The task wrapped in a Flux
     * @param <T> The type of value returned by the task
     */
    private <T> Flux<T> waitForNext(Publisher<T> task) {
        var ready = Sinks.<Boolean>one();
        var release = Sinks.<Boolean>one();
        taskQueue.tryEmitNext(Tuples.of(ready, release.asMono()));
        return ready.asMono()
                .flatMapMany(ignored -> Flux.from(task))
                .doOnComplete(() -> release.tryEmitValue(true))
                .doOnError(err -> release.tryEmitValue(true));
    }
}

用法:

ReactiveSemaphore semaphore = new ReactiveSemaphore();
semaphore.lock(someFluxMonoOrPublisher);

示例测试——在这个测试中,我们创建了 10 个 Monos,它们在 1 秒后发出一个值,并尝试并行运行所有它们,但是我们将它们包装在一个池大小为 2 的 ReactiveSemaphore 中,以便在其中运行的不超过 2 个并行:

@Test
public void testParallelExecution() {
    var semaphore = new ReactiveSemaphore(2);
    var monos = IntStream.range(0, 10)
            .mapToObj(i -> Mono.fromSupplier(() -> {
                        log.info("Executing Mono {}", i);
                        return i;
                    })
                    .delayElement(Duration.ofMillis(1000)))
            .map(mono -> semaphore.lock(mono));

    var allMonos = Flux.fromStream(monos).flatMap(m -> m).doOnNext(v -> log.info("Got value {}", v));

    StepVerifier.create(allMonos)
            .expectNextCount(10)
            .verifyComplete();
}

/* OUTPUT:
12:52:40.752 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 0
12:52:40.755 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 1
12:52:41.762 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Got value 0
12:52:41.765 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Executing Mono 2
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Got value 1
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Executing Mono 3
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 2
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 4
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 3
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 5
12:52:43.790 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 6
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 4
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 5
12:52:43.791 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 7
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 6
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 7
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 8
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 8 
© www.soinside.com 2019 - 2024. All rights reserved.