加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

用C++11实现thread-safe的ringbuffer

(2017-10-27 20:54:42)
分类: 编程语言

* 将存放数据的数组想象成头尾相接的ring buffer,数据从尾写入,从头读出。
当writeIndex从前面赶上readIndex时,buffer为满;当readIndex从前面赶上writeIndex时,buffer为空。可以据此用两个flag来标记满或空。这里实现时用数据个数来判断了。

循环队列的元素个数:(尾 - 头 + 表长) % 表长

* 使用atomic实现自旋锁来作共享保护。

#include < atomic >
#include < memory >
#include < mutex >

class spinlock
{
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
    void lock() noexcept { 
        while (flag.test_and_set(std::memory_order_acquire))
            ;
    }
    void unlock() noexcept { flag.clear(std::memory_order_release); }
    void try_lock() noexcept { 
        return !flag.test_and_set(std::memory_order_acquire);
    }
};

template < typename T >
class Queue
{
    static constexpt unsigned MAX_QUEUE_SIZE = 100;
public:
    Queue();
    explicit Queue(unsigned maxSize = MAX_QUEUE_SIZE);

    Queue(const Queue &other);
    Queue& operator=(const Queue &) = delete;

    ~Queue();

    // pushes an item to Queue tail
    void enqueue(const T& item);

    // pops an item from Queue head
    std::shared_ptr< T > dequeue();

    // try to push an item to Queue tail
    bool try_and_enqueue(const T& item);

    // try to pop and item from Queue head
    std::shared_ptr< T > try_and_dequeue();

    bool full();
    bool empty();
    unsigned capacity() { return CAPACITY; }
    unsigned count();

protected:
    spinlock lock;
    const unsigned CAPACITY;  // Queue capacity
    T *data;                  // array to store the items
    unsigned cnt;             // Queue count
    unsigned head;            // also the readIndex
    unsigned tail;            // also the writeIndex
};

template < typename T >
Queue< T >::Queue(unsigned maxSize): CAPACITY(maxSize), cnt(0), head(0), tail(0)
{
    data = new T[CAPACITY];
}

template < typename T >
Queue< T >::Queue(const Queue &other)
{
    std::lock_guard< spinlock > lg(lock);
    CAPACITY = other.CAPACITY;
    cnt = other.cnt;
    head = other.head;
    tail = other.tail;
    data = new T[CAPACITY];
    for (unsigned i = 0; i < CAPACITY; ++i)
        data[i] = other.data[i];
}

template < typename T >
Queue< T >::~Queue()
{
    delete[] data;
}

template < typename T >
void Queue< T >::enqueue(const T *item)
{
    while (!try_and_enqueue(item))
        ;
}

template < typename T >
std::shared_ptr< T > Queue< T >::dequeue()
{
    thread_local std::shared_ptr< T > ptr;
    while ((ptr = try_and_dequeue()) == nullptr)
        ;
    return ptr;
}

template < typename T >
bool Queue< T >::try_and_enqueue(const T &item)
{
    std::lock_guard< spinlock > lg(lock);
    if (cnt == CAPACITY)
        return false;    // full
    ++cnt;
    data[tail++] = std::move(item);
    if (tail == CAPACITY)
        tail -= CAPACITY;
    return true;
}

template < typename T >
std::shared_ptr< T > Queue< T >::try_and_dequeue()
{
    std::lock_guard< spinlock > lg(lock);
    if (cnt == 0)
        return std::shared_ptr< T >();    // empty
    --cnt;
    unsigned idx = head;
    ++head;
    if (head == CAPACITY)
        head -= CAPACITY;
    return std::make_shared< T >(std::move(data[idx]));
}

template < typename T >
bool Queue< T >::full()
{
    std::lock_guard< spinlock > lg(lock);
    return cnt == CAPACITY;
}

template < typename T >
bool Queue< T >::empty()
{
    std::lock_guard< spinlock > lg(lock);
    return cnt == 0;
} 

template < typename T >
unsigned Queue< T >::count()
{
    std::lock_guard< spinlock > lg(lock);
    return cnt;
}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有