无锁动态调整数组大小实现的问题

问题描述 投票:0回答:0

我一直在按照这篇论文中的说明实现无锁数组。目标是使用无锁操作创建一个线程安全、动态调整大小的数组。

该实现对于小负载来说似乎工作得很好。但是,当我尝试同时从两个线程插入 2e5 元素时,我注意到以下错误:

1.

Process finished with exit code 139 (interrupted by signal 11: SIGSEGV).
使用调试器我能找到的只是我的描述符以某种方式结束为空。

2.

ds(86913,0x16dde7000) malloc: Heap corruption detected, free list is damaged at 0x60000192c000
*** Incorrect guard value: 4329210352
ds(86913,0x16de73000) malloc: *** error for object 0x60000192c000: pointer being freed was not allocated
ds(86913,0x16de73000) malloc: *** set a breakpoint in malloc_error_break to debug

这是我的实现:

#ifndef DS_LOCKFREEARRAY_H
#define DS_LOCKFREEARRAY_H

#include <algorithm>
#include <cstddef>

namespace fast_ds {

    namespace internal {
        template<class T>
        class write_descriptor {
        public:
            T old_value_;
            T new_value_;
            size_t location_;
            bool completed_;

            write_descriptor() : location_(0), completed_(true) {}

            explicit write_descriptor(T oldValue, T newValue, size_t location, bool completed) : old_value_(oldValue),
                                                                                                 new_value_(newValue),
                                                                                                 location_(location),
                                                                                                 completed_(completed) {}
        };

        template<class T>
        class v_descriptor {
        public:
            size_t size_ = 0;
            write_descriptor<T> writeDescriptor_;

            v_descriptor() = default;

            explicit v_descriptor(size_t size, write_descriptor<T> w_descriptor) : size_(size),
                                                                                   writeDescriptor_(w_descriptor) {}
        };
    }

    template<class T>
    class LockFreeArray {
    private:
        static constexpr size_t kNumberOfBuckets = 32;
        static constexpr size_t kFirstBucketCapacity = 2;

        std::atomic<std::atomic<T> *> *data_;
        std::shared_ptr<internal::v_descriptor<T>> descriptor_ = std::make_shared<internal::v_descriptor<T>>();

        void CompleteWrite(internal::write_descriptor<T> write_descriptor) {
            if (!write_descriptor.completed_) {
                std::atomic<T> *memoryLocation = At(write_descriptor.location_);
                memoryLocation->compare_exchange_weak(write_descriptor.old_value_,
                                                      write_descriptor.new_value_);

                write_descriptor.completed_ = true;
            }
        }

        inline int HighestBitSet(unsigned int x) const {
            return 31 - __builtin_clz(x);
        }

        void AllocBucket(int bucket) {
            int bucketSize = 1 << (bucket + HighestBitSet(kFirstBucketCapacity));
            std::atomic<T> *currentBucket = data_[bucket].load();
            auto newBucket = new std::atomic<T>[bucketSize];

            if (!data_[bucket].compare_exchange_weak(currentBucket, newBucket)) {
                delete[] newBucket;
            }
        }

    public:
        explicit LockFreeArray() {
            data_ = new std::atomic<std::atomic<T> *>[kNumberOfBuckets];
            data_[0].store(new std::atomic<T>[kFirstBucketCapacity]);
        }

        LockFreeArray(const LockFreeArray &other) = delete;

        size_t Size() noexcept {
            auto current_descriptor = descriptor_;
            auto current_size = current_descriptor->size_;

            if (!descriptor_->writeDescriptor_.completed_) {
                current_size--;
            }

            return current_size;
        }

        inline bool Empty() const noexcept {
            return Size() == 0;
        }

        std::atomic<T> *At(size_t index) const {
            auto pos = index + kFirstBucketCapacity;
            auto hibit = HighestBitSet(pos);
            auto index_in_bucket = (pos ^ (1 << hibit));
            return &data_[hibit - HighestBitSet(kFirstBucketCapacity)][index_in_bucket];
        }

        void PushBack(const T &value) {
            internal::v_descriptor<T> *current_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> new_descriptor;
            std::shared_ptr<internal::v_descriptor<T>> current_descriptor_ptr = descriptor_;

            do {
                current_descriptor = descriptor_.get();
                if (current_descriptor == nullptr) {
                    continue;
                }
                current_descriptor_ptr = descriptor_;
                int current_size = current_descriptor->size_;
                internal::write_descriptor<T> wDescriptor = current_descriptor->writeDescriptor_;
                CompleteWrite(wDescriptor);

                if (int bucket = HighestBitSet(current_size + kFirstBucketCapacity) -
                                 HighestBitSet(kFirstBucketCapacity); data_[bucket].load() == 0) {
                    AllocBucket(bucket);
                }

                auto oldValue = At(current_size)->load();
                auto write_op = internal::write_descriptor<T>(oldValue, value,
                                                              current_size,
                                                              false);

                new_descriptor = std::make_shared<internal::v_descriptor<T>>(
                        internal::v_descriptor(current_size + 1, write_op));
            } while (!std::atomic_compare_exchange_strong(&descriptor_, &current_descriptor_ptr, new_descriptor));
            CompleteWrite(new_descriptor->writeDescriptor_);
        }

        T PopBack() {
            internal::v_descriptor<T> *current_descriptor;
            auto descriptorPtr = descriptor_;
            auto new_descriptor = std::make_shared<internal::v_descriptor<T>>();

            T elem;
            do {
                current_descriptor = descriptor_.get();
                if (current_descriptor == nullptr) {
                    continue;
                }
                descriptorPtr = descriptor_;
                int current_size = current_descriptor->size_;
                internal::write_descriptor<T> wDescriptor = current_descriptor->writeDescriptor_;
                CompleteWrite(wDescriptor);

                if (current_size == 0) {
                    throw std::out_of_range("vector is Empty");
                }

                elem = At(current_size - 1)->load();
                *new_descriptor = internal::v_descriptor(current_size - 1, internal::write_descriptor<T>());
            } while (!std::atomic_compare_exchange_weak(&descriptor_, &descriptorPtr, new_descriptor));

            return elem;
        }

        ~LockFreeArray() {
            //TODO
        }
    };
}

#endif

补充说明:

  1. 我使用苹果clang17来编译M1上的代码。
  2. 我尝试使用
    std::experimental::atomic_shared_ptr<T>
    atomic<shared_ptr<T>>
    (来自 clang20),但它们似乎不是由这个编译器或 gnu 实现的。
  3. 我使用原始指针来使操作尽可能快,但我考虑切换到智能指针。

我愿意接受改进我的实施的建议。

预先感谢您的帮助!

c++ data-structures memory-management concurrency lock-free
© www.soinside.com 2019 - 2024. All rights reserved.