C++11多线程condition_variable

八、条件变量std::condition_variable类

本节主要记录std::condition_variable类,以及wait()成员函数。

1、条件变量使用场景

条件变量类:std::condition_variable,是一个和条件相关的类,这个类需要和互斥量来配合工作,用的时候需要生成这个类的对象。一个适用的场景如下:

线程A中:等待一个条件满足,之后执行;

线程B中:线程B满足条件之后触发线程A。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class A
{
public:
void inMsgRecvQueue() // 线程B入口函数
{
for (int i = 0; i < 100000; ++i)
{
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
//std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_, std::try_to_lock);
std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_);
if (auto_mutex_1.owns_lock())
{
// 拿到了锁头
msgRecvQueue_.push_back(i); // 操作共享数据
//其他处理...
}
else
{
// 没有拿到锁头
cout << "outMsgRecvQueue执行,但是没有拿到锁,只能做点别的事情" << i << endl;
}
}
}

void outMsgRecvQueue() // 线程A入口函数
{
int command = 0;
for (int i = 0; i < 100000; ++i) // 循环10000次方便观察
{
bool result = outMsgLULProc(command);
if (true == result)
{
cout << "outMsgRecvQueue()执行,取出来1个数据" << command << endl;
// 可以进行数据处理..
}
else
{
// 消息队列为空
cout << "outMsgRecevQueue()执行,但是消息队列为空" << endl;
}
}
cout << "end" << endl;
}

bool outMsgLULProc(int &command)
{
std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_);
if (!msgRecvQueue_.empty())
{
command = msgRecvQueue_.front();
msgRecvQueue_.pop_front();
return true;
}
else
return false;
}
private:
std::list<int> msgRecvQueue_;
std::mutex my_mutex1_;
};

还是这个游戏服务器的例子,inMsgRecvQueue()函数向消息队列msgRecvQueue_中写数据;outMsgRecvQueue()从消息队列中读取并弹出数据。这两个线程入口函数不可以同时操作共享数据,所以做了互斥保护。但是每次进入outMsgLULProc()函数,系统都要尝试去加锁,而实际上只有消息队列不为空时才真正需要加锁,根据第七课(单例设计模式中的共享数据)的知识,我们可以使用双重否定if (!msgRecvQueue_.empty())来进一步提高程序的效率。双重否定后我们每次进入函数还需要判断消息队列是否为空,这里就是一个可以使用条件变量的地方,我们可以在线程B插入数据后,通知线程A消息队列不为空然后去处理数据。

2、wait()成员函数

wait()成员函数是std::condition_variable类中的一个重要函数,它的函数原型如下,第一参数是unique_lock类的对象,第二个参数是一切可调用对象。

1
2
3
4
5
6
template<class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred)
{ // wait for signal and test predicate
while (!_Pred())
wait(_Lck);
}

wait()函数阻塞当前线程,直到被唤醒。未被唤醒(第一次执行到wait())与唤醒时wait()有两种动作:

  1. wait()成员函数第一次执行会:

    如果第二个参数lambda表达式返回值是false,那么wait()将解锁互斥量,并阻塞到本行;

    一直阻塞到其他线程调用notify_one()函数为止;

    如果lambda返回值是true,那么wait()直接返回;

    如果wait()没有第二个参数,那么与第二个参数(lambda表达式)返回false效果一样;

  2. wait()被唤醒时的动作:

    当其他线程用notify_one()将本wait()(原来是阻塞/睡着的状态)唤醒后:

    a) wait()会不断地尝试重新获取互斥量锁,如果获取不到,流程卡在wait()(这就需要另一个线程notify_once()后需要把锁尽快解开)

    如果获取到了锁(获取到了锁就等于加了锁),那么wait()走下来,继续执行b)

    b)

    ​ b.1、如果wait()有第二个参数,判断这个lambda表达式,如果表达式为flse,wait()对互斥量解锁,又休眠等待再次被唤醒。

    ​ b.2、如果第二个参数(可调用对象)返回true,则wait()返回,流程继续执行,此时互斥量处于锁定状态。

    ​ 若wait()没有第二个参数,则wait()返回,流程继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include "pch.h"
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <chrono>
using namespace std;

class A
{
public:
void inMsgRecvQueue() // B线程入口函数
{
for (int i = 0; i < 100000; ++i)
{
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
std::unique_lock<std::mutex> auto_mutex_1(my_mutex1_);
msgRecvQueue_.push_back(i);
my_cond_.notify_one(); // 尝试把wait()的线程唤醒
// 执行完上一句,那么outMsgRecvQueue()里边的wait就会被唤醒
// 假如outMsgRecvQueue正在处理命令,需要一段时间,而不是正卡在wait(), 这时候notify_one()就没有效果
// 其他处理代码...
}
} // auto_mutex_1析构,锁头释放掉了

void outMsgRecvQueue() // A线程入口函数
{
int command;
while (true)
{
std::unique_lock<std::mutex> auto_mutex(my_mutex1_); // 拿到锁才会继续执行
my_cond_.wait(auto_mutex, [this]() {
if (!msgRecvQueue_.empty())
return true;
return false;
});
// 流程能到这里,互斥量一定是锁着的
command = msgRecvQueue_.front();
msgRecvQueue_.pop_front();
auto_mutex.unlock(); // 解锁,使得其他线程可以继续执行,unique_lock的灵活性(随时可以unlock)
cout << "outMsgRecvQueue执行,取出一个元素 " << command << endl;
// 执行一些处理动作
}
}

private:
std::list<int> msgRecvQueue_;
std::mutex my_mutex1_;
std::condition_variable my_cond_; // 一个条件对象
};

int main()
{
A myobja;
thread myOutMsgOgj(&A::outMsgRecvQueue, std::ref(myobja));
thread myInMsgObj(&A::inMsgRecvQueue, &myobja); // 第二个参数是&才能保证使用的是同一个对象!

myInMsgObj.join();
myOutMsgOgj.join();
cout << "Hello World!\n";
return 0;
}

3、上述程序的思考

线程A与线程B并不是你一次我一次(B线程插入一个数据通知A线程,A线程取出一个数据B再插入…)交替执行的!
B线程用notify_one()唤醒A线程的wait(),之后A线程的wait()去获取锁,B线程一个循环结束后再次获取锁,这里两个线程竞速,B若再次取锁成功,则就不是只插入一个数据了。

假如outMsgRecvQueue正在处理命令,需要一段时间,而不是正卡在wait(),这时候notify_one()就没有效果

由于竞争锁的原因,msgRecvQueue_可能积压多个数据(需要考虑是否限流使B线程不继续插入数据,或增加线程去处理数据)

notify_once()只能通知一个线程,notify_all()激活所有线程。

听说打赏我的人,最后都找到了真爱。