八、条件变量std::condition_variable类
本节主要记录std::condition_variable
类,以及wait()
成员函数。
1、条件变量使用场景
条件变量类:std::condition_variable
,是一个和条件相关的类,这个类需要和互斥量来配合工作,用的时候需要生成这个类的对象。一个适用的场景如下:
线程A中:等待一个条件满足,之后执行;
线程B中:线程B满足条件之后触发线程A。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| class A { public: void inMsgRecvQueue() { for (int i = 0; i < 100000; ++i) { cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl; std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_); if (auto_mutex_1.owns_lock()) { msgRecvQueue_.push_back(i); } else { cout << "outMsgRecvQueue执行,但是没有拿到锁,只能做点别的事情" << i << endl; } } } void outMsgRecvQueue() { int command = 0; for (int i = 0; i < 100000; ++i) { bool result = outMsgLULProc(command); if (true == result) { cout << "outMsgRecvQueue()执行,取出来1个数据" << command << endl; } else { cout << "outMsgRecevQueue()执行,但是消息队列为空" << endl; } } cout << "end" << endl; }
bool outMsgLULProc(int &command) { std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_); if (!msgRecvQueue_.empty()) { command = msgRecvQueue_.front(); msgRecvQueue_.pop_front(); return true; } else return false; } private: std::list<int> msgRecvQueue_; std::mutex my_mutex1_; };
|
还是这个游戏服务器的例子,inMsgRecvQueue()
函数向消息队列msgRecvQueue_
中写数据;outMsgRecvQueue()
从消息队列中读取并弹出数据。这两个线程入口函数不可以同时操作共享数据,所以做了互斥保护。但是每次进入outMsgLULProc()
函数,系统都要尝试去加锁,而实际上只有消息队列不为空时才真正需要加锁,根据第七课(单例设计模式中的共享数据)的知识,我们可以使用双重否定if (!msgRecvQueue_.empty())
来进一步提高程序的效率。双重否定后我们每次进入函数还需要判断消息队列是否为空,这里就是一个可以使用条件变量的地方,我们可以在线程B插入数据后,通知线程A消息队列不为空然后去处理数据。
2、wait()成员函数
wait()
成员函数是std::condition_variable
类中的一个重要函数,它的函数原型如下,第一参数是unique_lock
类的对象,第二个参数是一切可调用对象。
1 2 3 4 5 6
| template<class _Predicate> void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { while (!_Pred()) wait(_Lck); }
|
wait()
函数阻塞当前线程,直到被唤醒。未被唤醒(第一次执行到wait()
)与唤醒时wait()
有两种动作:
wait()成员函数第一次执行会:
如果第二个参数lambda表达式返回值是false,那么wait()将解锁互斥量,并阻塞到本行;
一直阻塞到其他线程调用notify_one()函数为止;
如果lambda返回值是true,那么wait()直接返回;
如果wait()没有第二个参数,那么与第二个参数(lambda表达式)返回false效果一样;
wait()被唤醒时的动作:
当其他线程用notify_one()将本wait()(原来是阻塞/睡着的状态)唤醒后:
a) wait()会不断地尝试重新获取互斥量锁,如果获取不到,流程卡在wait()(这就需要另一个线程notify_once()后需要把锁尽快解开)
如果获取到了锁(获取到了锁就等于加了锁),那么wait()走下来,继续执行b)
b)
b.1、如果wait()有第二个参数,判断这个lambda表达式,如果表达式为flse,wait()对互斥量解锁,又休眠等待再次被唤醒。
b.2、如果第二个参数(可调用对象)返回true,则wait()返回,流程继续执行,此时互斥量处于锁定状态。
若wait()没有第二个参数,则wait()返回,流程继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| #include "pch.h" #include <iostream> #include <thread> #include <list> #include <mutex> #include <chrono> using namespace std;
class A { public: void inMsgRecvQueue() { for (int i = 0; i < 100000; ++i) { cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl; std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_); msgRecvQueue_.push_back(i); my_cond_.notify_one(); } }
void outMsgRecvQueue() { int command; while (true) { std::unique_lock<std::mutex> auto_mutex(my_mutex1_); my_cond_.wait(auto_mutex, [this]() { if (!msgRecvQueue_.empty()) return true; return false; }); command = msgRecvQueue_.front(); msgRecvQueue_.pop_front(); auto_mutex.unlock(); cout << "outMsgRecvQueue执行,取出一个元素 " << command << endl; } }
private: std::list<int> msgRecvQueue_; std::mutex my_mutex1_; std::condition_variable my_cond_; };
int main() { A myobja; thread myOutMsgOgj(&A::outMsgRecvQueue, std::ref(myobja)); thread myInMsgObj(&A::inMsgRecvQueue, &myobja);
myInMsgObj.join(); myOutMsgOgj.join(); cout << "Hello World!\n"; return 0; }
|
3、上述程序的思考
线程A与线程B并不是你一次我一次(B线程插入一个数据通知A线程,A线程取出一个数据B再插入…)交替执行的!
B线程用notify_one()
唤醒A线程的wait()
,之后A线程的wait()
去获取锁,B线程一个循环结束后再次获取锁,这里两个线程竞速,B若再次取锁成功,则就不是只插入一个数据了。
假如outMsgRecvQueue
正在处理命令,需要一段时间,而不是正卡在wait()
,这时候notify_one()
就没有效果
由于竞争锁的原因,msgRecvQueue_
可能积压多个数据(需要考虑是否限流使B线程不继续插入数据,或增加线程去处理数据)
notify_once()
只能通知一个线程,notify_all()
激活所有线程。