使用 ConcurrentHashMap 替换带有锁定的 HazelCast IMap

问题描述 投票:0回答:1

从不再需要分布式缓存的应用程序中删除 HazelCast。
但是我仍然需要保持对类似于 IMap 的 Map 的同步访问。
我首先想用Map和ConcurrentHashMap替换IMap。

现在集合只是一个简单的Java Map,但我仍然需要控制对其的异步读/写。
ConcurrentHashMap 是否足以解决这个问题,或者我是否需要其他锁定解决方案/机制。

该应用程序使用 RabbitMQ,有 3 个队列。
使用 HazelCast,我有一个带有域对象的 IMap,我在访问它的每个 MQ 线程中执行了锁定。

在批处理作业的初始化期间,它会创建新的域对象,并将其放置在地图上。
在初始化结束时,它会在 3 个不同的消息队列上为 Map 中的每个对象发送请求。

只有 1 个 MQ 线程应该能够从 Map 获取对象、更新它并将其放回到 Map 上。
ConcurrentHashMap 可以处理这个问题吗?我需要在 Map.get 和 Map.put 之间实现一些锁定/解锁吗?

使用 HazelCast,这是通过锁定地图来完成的。

IMap<Long, MyObject> myCollection;

public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
    final Long myObjectId = response.getObjectId();
    myCollection.lock(myObjectId);
    try {
        final MyObject myObject = myCollection.get(myObjectId);
        updateMyObject(myObject, response);
        myCollection.put(myObjectId, myObject)
        if (myObject.isCompleted()) {
            repository.save(myObject);
        }
    } finally {
        myCollection.unlock(myObjectId);
    }
}

public void processMessage2(final MyResponse2 reponse) { // MQ-Thread2
    // Similar with locking as processMessage1
}

public void processMessage3(final MyResponse3 reponse) { // MQ-Thread3
    // Similar with locking as processMessage1
}

将 HazelCast 和 IMap 锁定更改为 ConcurrentHashMap 后,我的代码现在如何。


@Data
@Component
public class MyCache {
    private final Map<Long, Producer> producers = new HashMap<>();
    private final ConcurrentMap<Long, Customer> customers = new ConcurrentHashMap<>();
}

@Service
public class CustomerService {

    private final MyCache myCache;

    public CustomerService(final MyCache myCache) {
        this.myCache = myCache;
    }

    @Transaction
    public void startProcessing() {
        getProducers();
        deleteCustomers();
        myCache.getProducers().keySet().forEach(id -> startProcessingProducer(id));
    }

    public void startProcessingProducer(Long producerId) {
        log.info("Started processing for producerId={}", producerId);
        initialize(producerId);
    }

    public void initialize(Long producerId) {
        log.info("Initialize start {}", producerId);
        final Customer customer = new Customer();
        sendRabbitMq1(producerId);
        sendRabbitMq2(producerId);
        sendRabbitMq3(producerId);

        myCache.getCustomers().put(producerId, customer);
        log.info("Initialize end {}", producerId);
    }

}

@Service
public class MessageService {

    private final MyCache myCache;

    private final StorageService storageService;

    public MessageService(final MyCache myCache, final StorageService storageService) {
        this.myCache = myCache;
        this.storageService = storageService;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse1> response1() {
        return this::processResponse1;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse2> response2() {
        return this::processResponse2;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse3> response3() {
        return this::processResponse3;
    }

    private void processResponse1(final MyResponse1 response) {
        myCache.getCustomers().computeIfPresent(response.getProducerId(),
            (producerId, customer) -> updateFromResponse1(response, customer));
    }

    private Customer updateFromResponse1(final MyResponse1 response, final Customer customer) {
        log.debug("Got Response 1 {}", response);
        // Updating Customer with Response
        process(customer, storageService::saveCustomer);
        return customer;
    }

    public void process(final Customer customer, final Consumer<Customer> customerUpdateCallback) {
        // Processing Customer
        customerUpdateCallback.accept(customer);
    }

}

@Service
public class StorageService {

    private final CustomerRepository customerRepository;

    public StorageService(final CustomerRepository customerRepository) {
        this.customerRepository = customerRepository;
    }

    public void saveCustomer(final Customer customer) {
        customerRepository.save(customer);
    }

}
java spring-boot rabbitmq hazelcast spring-rabbit
1个回答
1
投票

ConcurrentHashMap 可以使用以下方法之一解决键锁定问题:

关键信息是 javadoc 指出:

整个方法调用都是原子执行的。每次调用此方法时,所提供的函数都会被调用一次。

如果我们调整您的代码示例以使用 ConcurrentHashMap 进行更新,它将给出如下所示的内容:

final Map<Long, MyObject> myCollection = new ConcurrentHashMap<>();

public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
    final Long myObjectId = response.getObjectId();
    myCollection.computeIfPresent(myObjectId, (key, oldValue) -> {
        var newValue = updateMyObject(oldValue, response);
        if (newValue.isCompleted()) {
            repository.save(newValue);
        }
        return newValue;
    });
}
© www.soinside.com 2019 - 2024. All rights reserved.