实现blockingqueue

文章目录
  1. 1. 参考资料
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//SyncQueue.hpp
#include <list>
#include <mutex>
#include <condition_variable>
#include <iostream>

template<typename T>
class SyncQueue
{
private:
bool IsFull() const
{
return m_queue.size() == m_maxSize;
}

bool IsEmpty() const
{
return m_queue.empty();
}

public:
SyncQueue(int maxSize) : m_maxSize(maxSize)
{
}

void Put(const T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);

while (IsFull())
{
std::cout << "the blocking queue is full,waiting..." << std::endl;
m_notFull.wait(m_mutex);
}
m_queue.push_back(x);
m_notEmpty.notify_one();
}

void Take(T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);

while (IsEmpty())
{
std::cout << "the blocking queue is empty,wating..." << std::endl;
m_notEmpty.wait(m_mutex);
}

x = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}

private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable_any m_notEmpty;//不为空的条件变量
std::condition_variable_any m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
};

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//test.cpp

#include "SyncQueue.hpp"
#include <thread>
#include <iostream>
#include <chrono>

SyncQueue<int> syncQueue(5);
void Produce()
{
for (int i = 0; i < 15; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
syncQueue.Put(888);
std::cout<<"put(888)"<<std::endl;
}
}

void Consume()
{
int x = 0;

for (int i = 0; i < 5; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
syncQueue.Take(x);
std::cout << "take(888)" << std::endl;
}
}

int main(void)
{
std::thread producer(Produce);
std::thread consumer1(Consume);
std::thread consumer2(Consume);
std::thread consumer3(Consume);
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();

return 0;
}

参考资料