基于参数的Java同步(名为mutex / lock)

问题描述 投票:17回答:8

我正在寻找一种基于它接收的参数同步方法的方法,如下所示:

public synchronized void doSomething(name){
//some code
}

我想基于doSomething参数同步方法name,如下所示:

线程1:doSomething(“a”);

线程2:doSomething(“b”);

线程3:doSomething(“c”);

线程4:doSomething(“a”);

线程1,线程2和线程3将执行代码而不进行同步,但线程4将等待,直到线程1完成代码,因为它具有相同的“a”值。

谢谢

UPDATE

根据都铎的解释,我认为我面临另一个问题:这是新代码的示例:

private HashMap locks=new HashMap();
public void doSomething(String name){
    locks.put(name,new Object());
    synchronized(locks.get(name)) {
        // ...
    }
    locks.remove(name);
}

我没有填充锁定映射的原因是因为name可以有任何值。

基于上面的示例,由于HashMap不是线程安全的,因此在同一时间由多个线程添加/删除散列映射中的值时会出现问题。

所以我的问题是,如果我使HashMap成为线程安全的ConcurrentHashMap,那么synchronized块会阻止其他线程访问locks.get(name)吗?

java multithreading synchronized
8个回答
16
投票

使用映射将字符串与锁定对象关联:

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());
// etc.

然后:

public void doSomething(String name){
    synchronized(locks.get(name)) {
        // ...
    }
}

10
投票

都铎的答案很好,但它是静态的,不可扩展。我的解决方案是动态和可扩展的,但它在实现中增加了复杂性。外部世界可以像使用Lock一样使用这个类,因为这个类实现了接口。您可以通过工厂方法getCanonicalParameterLock获取参数化锁定的实例。

package lock;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class ParameterLock implements Lock {

    /** Holds a WeakKeyLockPair for each parameter. The mapping may be deleted upon garbage collection
     * if the canonical key is not strongly referenced anymore (by the threads using the Lock). */
    private static final Map<Object, WeakKeyLockPair> locks = new WeakHashMap<>();

    private final Object key;
    private final Lock lock;

    private ParameterLock (Object key, Lock lock) {
        this.key = key;
        this.lock = lock;
    }

    private static final class WeakKeyLockPair {
        /** The weakly-referenced parameter. If it were strongly referenced, the entries of
         * the lock Map would never be garbage collected, causing a memory leak. */
        private final Reference<Object> param;
        /** The actual lock object on which threads will synchronize. */
        private final Lock lock;

        private WeakKeyLockPair (Object param, Lock lock) {
            this.param = new WeakReference<>(param);
            this.lock = lock;
        }
    }

    public static Lock getCanonicalParameterLock (Object param) {
        Object canonical = null;
        Lock lock = null;

        synchronized (locks) {
            WeakKeyLockPair pair = locks.get(param);            
            if (pair != null) {                
                canonical = pair.param.get(); // could return null!
            }
            if (canonical == null) { // no such entry or the reference was cleared in the meantime                
                canonical = param; // the first thread (the current thread) delivers the new canonical key
                pair = new WeakKeyLockPair(canonical, new ReentrantLock());
                locks.put(canonical, pair);
            }
        }

        // the canonical key is strongly referenced now...
        lock = locks.get(canonical).lock; // ...so this is guaranteed not to return null
        // ... but the key must be kept strongly referenced after this method returns,
        // so wrap it in the Lock implementation, which a thread of course needs
        // to be able to synchronize. This enforces a thread to have a strong reference
        // to the key, while it isn't aware of it (as this method declares to return a 
        // Lock rather than a ParameterLock).
        return new ParameterLock(canonical, lock);               
    }

    @Override
    public void lock() {
        lock.lock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();
    }

    @Override
    public boolean tryLock() {
        return lock.tryLock();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return lock.tryLock(time, unit);
    }

    @Override
    public void unlock() {
        lock.unlock();
    }

    @Override
    public Condition newCondition() {
        return lock.newCondition();
    }
}

当然,您需要一个给定参数的规范键,否则线程将不会被同步,因为它们将使用不同的Lock。规范化等同于Tudor解决方案中字符串的内化。 String.intern()本身就是线程安全的,我的'规范池'不是,所以我需要在WeakHashMap上进行额外的同步。

此解决方案适用于任何类型的对象。但是,请确保在自定义类中正确实现equalshashCode,因为如果没有,则会出现线程问题,因为多个线程可能正在使用不同的Lock对象进行同步!

WeakHashMap的选择可以通过它带来的内存管理的简易性来解释。怎么可能知道没有线程正在使用特定的锁?如果可以知道,你怎么能安全地删除地图中的条目?您需要在删除时进行同步,因为您希望使用锁定的到达线程与从地图中删除锁定的操作之间存在竞争条件。所有这些都是通过使用弱引用来解决的,因此VM为您完成了工作,这大大简化了实现。如果您检查了WeakReference的API,您会发现依赖弱引用是线程安全的。

现在检查这个测试程序(由于某些字段的私有可见性,你需要从ParameterLock类内部运行它):

public static void main(String[] args) {
    Runnable run1 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Runnable run2 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Thread t1 = new Thread(run1);
    Thread t2 = new Thread(run2);

    t1.start();
    t2.start();

    try {
        t1.join();
        t2.join();
        while (locks.size() != 0) {
            System.gc();
            System.out.println(locks);
        }
        System.out.println("FINISHED!");
    } catch (InterruptedException ex) {
        // those threads won't be interrupted
    }
}

private static void sync (Object param) {
    Lock lock = ParameterLock.getCanonicalParameterLock(param);
    lock.lock();
    try {
        System.out.println("Thread="+Thread.currentThread().getName()+", lock=" + ((ParameterLock) lock).lock);
        // do some work while having the lock
    } finally {
        lock.unlock();
    }        
}

很可能你会发现两个线程都使用相同的锁对象,因此它们是同步的。示例输出:

Thread=Thread-0, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-0]
Thread=Thread-1, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-1]
FINISHED!

但是,有可能两个线程在执行时不会重叠,因此不要求它们使用相同的锁。通过在正确的位置设置断点,可以在调试模式下轻松强制执行此行为,从而强制第一个或第二个线程在必要时停止。您还会注意到,在主线程上的垃圾收集之后,WeakHashMap将被清除,这当然是正确的,因为主线程在调用垃圾收集器之前通过调用Thread.join()等待两个工作线程完成其工作。这确实意味着在工作线程内不再存在对(参数)锁的强引用,因此可以从弱hashmap中清除引用。如果另一个线程现在想要在同一个参数上进行同步,则会在getCanonicalParameterLock的synchronized部分中创建一个新的Lock。

现在用任何具有相同规范表示的对重复测试(=它们是相同的,所以a.equals(b)),并看到它仍然有效:

sync("a");
sync(new String("a"))

sync(new Boolean(true));
sync(new Boolean(true));

等等

基本上,这个类为您提供以下功能:

  • 参数化同步
  • 封装内存管理
  • 能够处理任何类型的对象(在equalshashCode正确实现的条件下)
  • 实现Lock接口

这个Lock实现已经通过修改一个ArrayList与10个线程同时修改1000次来测试,这样做:添加2个项目,然后通过迭代完整列表删除最后找到的列表条目。每次迭代都会请求锁定,因此总共需要10 * 1000个锁。没有引发ConcurrentModificationException,并且在所有工作线程完成后,项目总数为10 * 1000。在每次修改时,都通过调用ParameterLock.getCanonicalParameterLock(new String("a"))来请求锁定,因此使用新的参数对象来测试规范化的正确性。

请注意,您不应该为参数使用字符串文字和基本类型。由于字符串文字是自动实现的,因此它们总是具有强引用,因此如果第一个线程以其参数的字符串文字到达,则锁定池将永远不会从条目中释放,这是内存泄漏。同样的故事适用于自动装箱原语:例如Integer有一个缓存机制,它将在自动装箱过程中重用现有的Integer对象,同时也会导致存在强引用。然而,解决这个问题,这是一个不同的故事。


1
投票

看看这个框架。好像你在找这样的东西。

public class WeatherServiceProxy {
...
private final KeyLockManager lockManager = KeyLockManagers.newManager();

public void updateWeatherData(String cityName, Date samplingTime, float temperature) {
        lockManager.executeLocked(cityName, new LockCallback() {
                public void doInLock() {
                        delegate.updateWeatherData(cityName, samplingTime, temperature);
                }
        });
}

https://code.google.com/p/jkeylockmanager/


1
投票

我已经基于McDowell的IdMutexProvider创建了一个tokenProvider。经理使用WeakHashMap来处理清理未使用的锁。

你可以找到我的实施here


1
投票

我通过另一个stackoverflow问题找到了正确的答案:How to acquire a lock by a key

我在这里复制了答案:

番石榴有这样的东西在13.0发布;如果你愿意,你可以把它从HEAD中拿出来。

Striped或多或少分配特定数量的锁,然后根据哈希代码将字符串分配给锁。 API看起来或多或少像

Striped<Lock> locks = Striped.lock(stripes);
Lock l = locks.get(string);
l.lock();
try {
  // do stuff 
} finally {
  l.unlock();
}

或多或少,可控制的条带数使您可以根据内存使用情况进行并发交易,因为为每个字符串键分配完整锁定会变得昂贵;实质上,当你得到哈希冲突时,你只会得到锁争用,这是(可预见的)罕见的。


1
投票

TL; DR:

我使用Spring Framework中的ConcurrentReferenceHashMap。请检查以下代码。


虽然这个帖子很老,但它仍然很有趣。因此,我想与Spring Framework分享我的方法。

我们试图实现的是名为mutex / lock。正如Tudor's answer所建议的那样,想法是有一个Map来存储锁名称和锁定对象。代码如下所示(我从他的回答中复制):

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());

但是,这种方法有两个缺点:

  1. OP已经指出了第一个:如何同步访问locks哈希映射?
  2. 如何删除一些不再需要的锁?否则,locks哈希映射将继续增长。

第一个问题可以通过使用ConcurrentHashMap来解决。对于第二个问题,我们有两个选项:手动检查并从地图中删除锁,或者以某种方式让垃圾收集器知道哪些锁不再使用,GC将删除它们。我将采用第二种方式。

当我们使用HashMapConcurrentHashMap时,它会创建强大的参考。要实现上面讨论的解决方案,应该使用弱引用(要了解什么是强/弱引用,请参考this articlethis post)。


所以,我使用Spring Framework中的ConcurrentReferenceHashMap。如文档中所述:

ConcurrentHashMap对键和值使用软引用或弱引用。

此类可用作Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>())的替代方法,以便在并发访问时支持更好的性能。此实现遵循与ConcurrentHashMap相同的设计约束,但支持null值和null键。

这是我的代码。 MutexFactory<K>管理所有锁是关键的类型。

@Component
public class MutexFactory<K> {

    private ConcurrentReferenceHashMap<K, Object> map;

    public MutexFactory() {
        this.map = new ConcurrentReferenceHashMap<>();
    }

    public Object getMutex(K key) {
        return this.map.compute(key, (k, v) -> v == null ? new Object() : v);
    }
}

用法:

@Autowired
private MutexFactory<String> mutexFactory;

public void doSomething(String name){
    synchronized(mutexFactory.getMutex(name)) {
        // ...
    }
}

单元测试(此测试使用awaitility库进行某些方法,例如await()atMost()until()):

public class MutexFactoryTests {
    private final int THREAD_COUNT = 16;

    @Test
    public void singleKeyTest() {
        MutexFactory<String> mutexFactory = new MutexFactory<>();
        String id = UUID.randomUUID().toString();
        final int[] count = {0};

        IntStream.range(0, THREAD_COUNT)
                .parallel()
                .forEach(i -> {
                    synchronized (mutexFactory.getMutex(id)) {
                        count[0]++;
                    }
                });
        await().atMost(5, TimeUnit.SECONDS)
                .until(() -> count[0] == THREAD_COUNT);
        Assert.assertEquals(count[0], THREAD_COUNT);
    }
}

0
投票

我使用缓存来存储锁定对象。我的缓存将在一段时间后使对象失效,这实际上只需要比同步进程运行所花费的时间更长

`

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

...

private final Cache<String, Object> mediapackageLockCache = CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();

...

public void doSomething(foo) {
    Object lock = mediapackageLockCache.getIfPresent(foo.toSting());
    if (lock == null) {
        lock = new Object();
        mediapackageLockCache.put(foo.toString(), lock);
    }

    synchronized(lock) {
        // execute code on foo
        ...
    }
}

`


0
投票

我有一个更简单,可扩展的实现,类似于@timmons post利用guavas LoadingCacheweakValues。您将需要阅读“相等”的帮助文件,以了解我所做的建议。

定义以下弱值缓存。

private final LoadingCache<String,String> syncStrings = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<String, String>() {
    public String load(String x) throws ExecutionException {
        return new String(x);
    }
});

public void doSomething(String x) {
      x = syncStrings.get(x);
      synchronized(x) {
          ..... // whatever it is you want to do
      }
}

现在!作为JVM的结果,我们不必担心缓存增长太大,只要有必要就保存缓存的字符串,垃圾管理器/番石榴就可以解决繁重的问题。

© www.soinside.com 2019 - 2024. All rights reserved.