日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

SPSC Queue

發(fā)布時間:2024/1/8 windows 28 coder
生活随笔 收集整理的這篇文章主要介紹了 SPSC Queue 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在多線程編程中,一個著名的問題是生產(chǎn)者-消費者問題 (Producer Consumer Problem, PC Problem)。

對于這類問題,通過信號量加鎖 (https://www.cnblogs.com/sinkinben/p/14087750.html) 來設(shè)計 RingBuffer 是十分容易實現(xiàn)的,但欠缺性能。

考慮一個特殊的場景,生產(chǎn)者和消費者均只有一個 (Single Producer Single Consumer, SPSC),在這種情況下,我們可以設(shè)計一個無鎖隊列來解決 PC 問題。

0. Background

考慮以下場景:在一個計算密集型 (Computing Intensive) 和延遲敏感的 for 循環(huán)當中,每次循環(huán)結(jié)束,需要打印當前的迭代次數(shù)以及計算結(jié)果。

void matrix_compute()
{
    for (i = 0 to n)
    {
        // code of computing
        ...
        // print i and result of computing
        std::cout << ...
    }
}

在這種情況下,如果使用簡單的 std::cout 輸出,由于 I/O 的性質(zhì),將會造成嚴重的延遲 (Latency)。

一個直觀的解決辦法是:將 Log 封裝為一個字符串,傳遞給其他線程,讓其他線程打印該字符串,實現(xiàn)異步的 Logging 。

1. Lock-free SPSC Queue

此處使用一個 RingBuffer 來實現(xiàn)隊列。

由于是 SPSC 型的隊列,隊列頭部 head 只會被 Consumer 寫入,隊列尾部 tail 只會被 Producer 寫入,所以 SPSC Queue 可以是無鎖的,但需要保證寫入的原子性。

template <class T> class spsc_queue
{
  private:
    std::vector<T> m_buffer;
    std::atomic<size_t> m_head;
    std::atomic<size_t> m_tail;
  public:
    spsc_queue(size_t capacity) : m_buffer(capacity + 1), m_head(0), m_tail(0) {}
    inline bool enqueue(const T &item);
    inline bool dequeue(T &item);
};

對于一個 RingBuffer 而言,判空與判滿的方法如下:

  • Empty 的條件:head == tail
  • Full 的條件:(tail + 1) % N == head

因此,enqueuedequeue 可以是以下的實現(xiàn):

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();

    if (next == m_head.load(std::memory_order_acquire))
        return false;

    m_buffer[tail] = item;
    m_tail.store(next, std::memory_order_release);
    return true;
}

inline bool dequeue(T &item)
{
    const size_t head = m_head.load(std::memory_order_relaxed);

    if (head == m_tail.load(std::memory_order_acquire))
        return false;

    item = m_buffer[head];
    const size_t next = (head + 1) % m_buffer.size();
    m_head.store(next, std::memory_order_release);
    return true;
}

std::memory_order 的使用說明:https://en.cppreference.com/w/cpp/atomic/memory_order

Benchmark 計算 SPSC Queue 的吞吐量:

Mean:   29,158,897.200000 elements/s 
Median: 29,178,822.000000 elements/s 
Max:    29,315,199 elements/s 
Min:    28,995,515 elements/s 

Benchmark 的計算方法為:

  • Producer 和 Consumer 分別執(zhí)行 1e8enqueuedequeue ,計算隊列為空所耗費的總時間 t1e8 / t 即為吞吐量。
  • 上述過程執(zhí)行 10 次,最終計算 mean, median, min, max 的值。

2. Remove cache false sharing

什么是 Cache False Sharing? 參考 Architecture of Modern CPU 的 Exercise 一節(jié)。

int *a = new int[1024]; 
void worker(int idx)
{
    for (int j = 0; j < 1e9; j++)
        a[idx] = a[idx] + 1;
}

考慮以下程序:

  • P1: 開啟 2 線程,執(zhí)行 worker(0), worker(1)
  • P2: 開啟 2 線程,執(zhí)行 worker(0), worker(16)

P2 的執(zhí)行速度會比 P1 快,現(xiàn)代 CPU 的 Cache Line 大小一般為 64 字節(jié),由于 a[0], a[1] 位于同一個 CPU Core 的同一個 Cache Line,每次寫入都會帶來數(shù)據(jù)競爭 (Data Race) ,觸發(fā)緩存和內(nèi)存的同步(參考 MESI 協(xié)議),而 a[0], a[16] 之間相差了 64 字節(jié),不在同一個 Cache Line,所以避免了這個問題。

所以,對于上述的 SPSC Queue,可以進行以下改進:

template <class T>
class spsc_queue
{
private:
    std::vector<T> m_buffer;
    alignas(64) std::atomic<size_t> m_head;
    alignas(64) std::atomic<size_t> m_tail;
};

這里的 alignas(64) 實際上改為 std::hardware_constructive_interference_size 更加合理,因為 Cache Line 的大小取決于具體 CPU 硬件的實現(xiàn),并不總是為 64 字節(jié)。

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

Benchmark 結(jié)果:

Mean:   38,993,940.400000 elements/s 
Median: 39,027,123.000000 elements/s 
Max:    39,253,946 elements/s 
Min:    38,624,197 elements/s 

3. Remove useless memory access

在使用 spsc_queue 的時候,通常會有以下形式的代碼:

spsc_queue sq(1024);
// Producer keep spinning
int x = 233;
while (!sq.enqueue(x)) {}

而在 dequeue/enqueue 中,存在判空/判滿的代碼:

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();
    if (next == m_head.load(std::memory_order_acquire))
        return false;
    // ...
}

每次執(zhí)行 m_head.load,Producer 線程的 CPU 都會訪問一次 m_head 所在的內(nèi)存,但實際上觸發(fā)該條件的概率較小(因為在實際的場景下, Producer/Consumer 都是計算密集型,否則根本不需要無鎖的數(shù)據(jù)結(jié)構(gòu))。在判空/判滿的時候,可以去 “離 CPU 更近” 的 Cache 去獲取 m_head 的值。

template <class T>
class spsc_queue
{
private:
    std::vector<T> m_buffer;
    alignas(hardware_constructive_interference_size) std::atomic<size_t> m_head;
    alignas(hardware_constructive_interference_size) std::atomic<size_t> m_tail;

    alignas(hardware_constructive_interference_size) size_t cached_head;
    alignas(hardware_constructive_interference_size) size_t cached_tail;
};

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();

    if (next == cached_head)
    {
        cached_head = m_head.load(std::memory_order_acquire);
        if (next == cached_head)
            return false;
    }
    // ...
}

Benchmark 結(jié)果:

Mean:   79,740,671.300000 elements/s 
Median: 79,838,314.000000 elements/s 
Max:    80,044,793 elements/s 
Min:    79,241,180 elements/s 

4. Summary

Github: https://github.com/sinkinben/lock-free-queue

3 個版本的 spsc_queue 的吞吐量比較(均值,中位數(shù),最大值,最小值)。在優(yōu)化 Cache False Sharing 和優(yōu)先從 Cache 讀取 head, tail 之后,可得到 x2 的提升。

總結(jié)

以上是生活随笔為你收集整理的SPSC Queue的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。