muduo BoundedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

#include "muduo/base/Condition.h"
#include "muduo/base/Mutex.h"

#include <boost/circular_buffer.hpp>
#include <assert.h>

namespace muduo
{

template<typename T>
class BoundedBlockingQueue : noncopyable
{
public:
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queue_(maxSize)
{
}

void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}

void put(T&& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(std::move(x));
notEmpty_.notify();
}

T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(std::move(queue_.front()));
queue_.pop_front();
notFull_.notify();
return front;
}

bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}

bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}

size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}

private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_);
};

} // namespace muduo

#endif // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

参考资料

查看更多

muduo封装MutexLock MutexLockGuard Condition

MutexLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class CAPABILITY("mutex") MutexLock : noncopyable
{
public:
MutexLock()
: holder_(0)
{
MCHECK(pthread_mutex_init(&mutex_, NULL));
}

~MutexLock()
{
assert(holder_ == 0);
MCHECK(pthread_mutex_destroy(&mutex_));
}

// must be called when locked, i.e. for assertion
bool isLockedByThisThread() const
{
return holder_ == CurrentThread::tid();
}

void assertLocked() const ASSERT_CAPABILITY(this)
{
assert(isLockedByThisThread());
}

// internal usage

void lock() ACQUIRE()
{
MCHECK(pthread_mutex_lock(&mutex_));
assignHolder();
}

void unlock() RELEASE()
{
unassignHolder();
MCHECK(pthread_mutex_unlock(&mutex_));
}

pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_;
}

private:
friend class Condition;

class UnassignGuard : noncopyable
{
public:
explicit UnassignGuard(MutexLock& owner)
: owner_(owner)
{
owner_.unassignHolder();
}

~UnassignGuard()
{
owner_.assignHolder();
}

private:
MutexLock& owner_;
};

void unassignHolder()
{
holder_ = 0;
}

void assignHolder()
{
holder_ = CurrentThread::tid();
}

pthread_mutex_t mutex_;
pid_t holder_;
};

MutexLockGuard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class SCOPED_CAPABILITY MutexLockGuard : noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex)
: mutex_(mutex)
{
mutex_.lock();
}

~MutexLockGuard() RELEASE()
{
mutex_.unlock();
}

private:

MutexLock& mutex_;
};

} // namespace muduo

// Prevent misuse like:
// MutexLockGuard(mutex_);
// A tempory object doesn't hold the lock for long!
#define MutexLockGuard(x) error "Missing guard object name"

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_CONDITION_H
#define MUDUO_BASE_CONDITION_H

#include "muduo/base/Mutex.h"

#include <pthread.h>

namespace muduo
{

class Condition : noncopyable
{
public:
explicit Condition(MutexLock& mutex)
: mutex_(mutex)
{
MCHECK(pthread_cond_init(&pcond_, NULL));
}

~Condition()
{
MCHECK(pthread_cond_destroy(&pcond_));
}

void wait()
{
MutexLock::UnassignGuard ug(mutex_);
MCHECK(pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()));
}

// returns true if time out, false otherwise.
bool waitForSeconds(double seconds);

void notify()
{
MCHECK(pthread_cond_signal(&pcond_));
}

void notifyAll()
{
MCHECK(pthread_cond_broadcast(&pcond_));
}

private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};

} // namespace muduo

#endif // MUDUO_BASE_CONDITION_H
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool muduo::Condition::waitForSeconds(double seconds)
{
struct timespec abstime;
// FIXME: use CLOCK_MONOTONIC or CLOCK_MONOTONIC_RAW to prevent time rewind.
clock_gettime(CLOCK_REALTIME, &abstime);

const int64_t kNanoSecondsPerSecond = 1000000000;
int64_t nanoseconds = static_cast<int64_t>(seconds * kNanoSecondsPerSecond);

abstime.tv_sec += static_cast<time_t>((abstime.tv_nsec + nanoseconds) / kNanoSecondsPerSecond);
abstime.tv_nsec = static_cast<long>((abstime.tv_nsec + nanoseconds) % kNanoSecondsPerSecond);

MutexLock::UnassignGuard ug(mutex_);
return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
}

参考资料

查看更多

muduo CountdownLatch

CountDownLatch

用途如下:

  1. 主线程发起多个子线程,等这些子线程各自都完成一定的任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
  2. 主线程发起多个子线程,子线程都等待主线程,主线程完成其他一些任务之后通知所有子线程开始执行。通常用于多个子线程等待主线程发出起跑命令。

查看更多

std:function&std:bind&

function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//代码出自链接:http://www.jellythink.com/archives/771
#include <functional>
#include <iostream>
using namespace std;

std::function< int(int)> Functional;

// 普通函数
int TestFunc(int a)
{
return a;
}

// Lambda表达式
auto lambda = [](int a)->int{ return a; };

// 仿函数(functor)
class Functor
{
public:
int operator()(int a)
{
return a;
}
};

// 1.类成员函数
// 2.类静态函数
class TestClass
{
public:
int ClassMember(int a) { return a; }
static int StaticMember(int a) { return a; }
};

int main()
{
// 普通函数
Functional = TestFunc;
int result = Functional(10);
cout << "普通函数:"<< result << endl;

// Lambda表达式
Functional = lambda;
result = Functional(20);
cout << "Lambda表达式:"<< result << endl;

// 仿函数
Functor testFunctor;
Functional = testFunctor;
result = Functional(30);
cout << "仿函数:"<< result << endl;

// 类成员函数
TestClass testObj;
Functional = std::bind(&TestClass::ClassMember, testObj, std::placeholders::_1);
result = Functional(40);
cout << "类成员函数:"<< result << endl;

// 类静态函数
Functional = TestClass::StaticMember;
result = Functional(50);
cout << "类静态函数:"<< result << endl;

return 0;
}

bind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>     // std::cout
#include <functional> // std::bind

// a function: (also works with function object: std::divides<double> my_divide;)
double my_divide(double x, double y) { return x / y; }

struct MyPair {
double a, b;
double multiply() { return a*b; }
};

int main() {
using namespace std::placeholders; // adds visibility of _1, _2, _3,...

// binding functions:
auto fn_five = std::bind(my_divide, 10, 2); // returns 10/2
std::cout << fn_five() << '\n'; // 5

auto fn_half = std::bind(my_divide, _1, 2); // returns x/2
std::cout << fn_half(10) << '\n'; // 5

auto fn_invert = std::bind(my_divide, _2, _1); // returns y/x
std::cout << fn_invert(10, 2) << '\n'; // 0.2

auto fn_rounding = std::bind<int>(my_divide, _1, _2); // returns int(x/y)
std::cout << fn_rounding(10, 3) << '\n'; // 3

MyPair ten_two{ 10,2 };

// binding members:

// returns x.multiply()
auto bound_member_fn = std::bind(&MyPair::multiply, _1);

std::cout << bound_member_fn(ten_two) << '\n'; // 20

// returns ten_two.a
auto bound_member_data = std::bind(&MyPair::a, ten_two);
std::cout << bound_member_data() << '\n'; // 10

return 0;
}

function 和 bind 结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<iostream>
#include<functional>

class A {
public:
int i_ = 0;
void output(int x, int y)
{
std::cout << x << " " << y << std::endl;
}
};

int main()
{
A a;
std::function<void(int, int)> func1 = std::bind(&A::output, &a, std::placeholders::_1,
std::placeholders::_2);
func1(1, 2);

std::function<int&(void)> func2 = std::bind(&A::i_, &a);
func2() = 888;

std::cout << a.i_ << std::endl;
return 0;
}

参考资料

查看更多

实现blockingqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//SyncQueue.hpp
#include <list>
#include <mutex>
#include <condition_variable>
#include <iostream>

template<typename T>
class SyncQueue
{
private:
bool IsFull() const
{
return m_queue.size() == m_maxSize;
}

bool IsEmpty() const
{
return m_queue.empty();
}

public:
SyncQueue(int maxSize) : m_maxSize(maxSize)
{
}

void Put(const T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);

while (IsFull())
{
std::cout << "the blocking queue is full,waiting..." << std::endl;
m_notFull.wait(m_mutex);
}
m_queue.push_back(x);
m_notEmpty.notify_one();
}

void Take(T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);

while (IsEmpty())
{
std::cout << "the blocking queue is empty,wating..." << std::endl;
m_notEmpty.wait(m_mutex);
}

x = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}

private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable_any m_notEmpty;//不为空的条件变量
std::condition_variable_any m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
};

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//test.cpp

#include "SyncQueue.hpp"
#include <thread>
#include <iostream>
#include <chrono>

SyncQueue<int> syncQueue(5);
void Produce()
{
for (int i = 0; i < 15; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
syncQueue.Put(888);
std::cout<<"put(888)"<<std::endl;
}
}

void Consume()
{
int x = 0;

for (int i = 0; i < 5; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
syncQueue.Take(x);
std::cout << "take(888)" << std::endl;
}
}

int main(void)
{
std::thread producer(Produce);
std::thread consumer1(Consume);
std::thread consumer2(Consume);
std::thread consumer3(Consume);
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();

return 0;
}

参考资料

查看更多

实现数据库连接池

数据库连接池采用单例模式创建,使用 RAII 机制释放数据库连接。

单例模式创建连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class connection_pool
{
public:
//局部静态变量单例模式
static connection_pool *GetInstance();

private:
connection_pool();
~connection_pool();
}

connection_pool *connection_pool::GetInstance()
{
static connection_pool connPool;
return &connPool;
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
connection_pool::connection_pool()
{
m_CurConn = 0;
m_FreeConn = 0;
}

//构造初始化
void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log)
{
m_url = url;
m_Port = Port;
m_User = User;
m_PassWord = PassWord;
m_DatabaseName = DBName;
m_close_log = close_log;

for (int i = 0; i < MaxConn; i++)
{
MYSQL *con = NULL;
con = mysql_init(con);

if (con == NULL)
{
LOG_ERROR("MySQL Error");
exit(1);
}
con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);

if (con == NULL)
{
LOG_ERROR("MySQL Error");
exit(1);
}
connList.push_back(con);
++m_FreeConn;
}

reserve = sem(m_FreeConn);

m_MaxConn = m_FreeConn;
}

获取&释放连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数
MYSQL *connection_pool::GetConnection()
{
MYSQL *con = NULL;

if (0 == connList.size())
return NULL;

reserve.wait(); // 取出连接,信号量原子 -1, 为0则等待

lock.lock();

con = connList.front();
connList.pop_front();

--m_FreeConn;
++m_CurConn;

lock.unlock();
return con;
}

//释放当前使用的连接
bool connection_pool::ReleaseConnection(MYSQL *con)
{
if (NULL == con)
return false;

lock.lock();

connList.push_back(con);
++m_FreeConn;
--m_CurConn;

lock.unlock();

reserve.post(); // 释放连接 原子+1
return true;
}

销毁连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//销毁数据库连接池
void connection_pool::DestroyPool()
{

lock.lock();
if (connList.size() > 0)
{
list<MYSQL *>::iterator it;
for (it = connList.begin(); it != connList.end(); ++it)
{
MYSQL *con = *it;
mysql_close(con);
}
m_CurConn = 0;
m_FreeConn = 0;
connList.clear();
}

lock.unlock();
}

查看更多

C++11 计时器

chrono

这里主要介绍时间点和时钟两个点:

​ 一般计时器就是从某个时间点开始,然后到某个时间点之间的计数,就是我们一般称之为耗时;

时间点:

1
2
template <class Clock, class Duration = typename Clock::duration>
class time_point;

std::chrono::time_point 表示一个具体时间

查看更多