多线程数据共享

五、多线程数据共享

本节主要记录std::mutex类,std::lock_guard类模板、std::lock函数模板。

1、创建和等待多个线程

多个线程的执行顺序是乱的,和操作系统的内部对线程的调度机制有关。使用容器存入线程对象,以这种方式操作多个线程比较方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "pch.h"
#include <iostream>
#include <thread>
#include <vector>
using namespace std;

// 线程入口函数
void myprint(int num) {
cout << "myprint线程 "<<num<<" 开始执行了,线程ID为:" << this_thread::get_id() << endl;
// ...做各种事情
cout << "myprint线程 "<<num<<" 结束运行了,线程ID为:" << this_thread::get_id()<< endl;
}
int main()
{
vector<thread> mythreads;
// 创建10个线程,线程入口函数统一使用myprint
// a)多个线程的执行顺序是乱的,和操作系统的内部对线程的调度机制有关
for (int i = 0; i < 10; ++i) {
mythreads.push_back(thread(myprint, i)); // 创建了10个线程,并且开始执行
}
for (auto iter = mythreads.begin(); iter != mythreads.end(); ++iter) {
iter->join(); // 等待10个线程都返回
}
cout << "Hello World!\n";
return 0;
}

2、只读全局数据

只读的数据,是安全稳定的,不需要特别的处理手段。

3、有读有写全局数据

假如我们有两个线程写入全局的vector<int>,八个线程从中读取,若果代码没有特别的处理,那程序肯定运行崩溃!一种最简单的不崩溃处理是:读的时候不能写,写的时候也不能读;两个线程不能同时写,8个线程不能同时读。以网络游戏服务器为例子,有两个自己的线程:一个线程收集玩家命令,并把命令数据写到一个网络服务器,另一个线程从队列中取出玩家发送来的命令,解析,然后执行该玩家需要的动作。以下的程序两个线程访问了未经过保护的共享数据,运行时就会崩溃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class A 
{
public:
// 把收到的玩家消息入到一个队列
void inMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
msgRecvQueue_.push_back(i); // 假设i就是收到的命令
}
}
// 把数据从消息队列中取出的线程
void outMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) { // 循环10000次方便观察
if (!msgRecvQueue_.empty()) {
int command = msgRecvQueue_.front();
msgRecvQueue_.pop_front();
}
else {
// 消息队列为空
cout << "outMsgRecevQueue()执行,但是消息队列为空" << i << endl;
}
}
cout << "end" << endl;
}
private:
std::list<int> msgRecvQueue_;
};

int main() {
A myobja;
thread myOutMsgOgj(&A::outMsgRecvQueue, std::ref(myobja));
thread myInMsgObj(&A::inMsgRecvQueue, &myobja); // 第二个参数是&才能保证使用的是同一个对象!

myInMsgObj.join();
myOutMsgOgj.join();
cout << "Hello World!\n";
return 0;
}

4、共享数据的保护案例

保护共享数据时,某个线程用代码把共享的数据锁住、操作数据、解锁;其他想操作共享数据的线程必须等待解锁,之后锁住数据、操作、解锁。

4.1、互斥量

互斥量是保护共享数据的一个方法,互斥量是一个类对象,可以理解为一把锁:

  • 多个线程尝试用.lock()成员函数来加锁mutex,但是只有一个线程能锁定成功(成功的标志是lock()函数返回)
  • 如果没有返回成功,那么程序的流程会阻塞在lock()这里不断地尝试去锁这把锁头;
  • 互斥量使用要小心,数据保护少了则没有达到保护的效果,数据保护多了则影响运行效率;
  • lock()unlock()要成对使用!非对称数量的调用都会导致程序不稳定。

将上一小节的程序加上mutex保护,线程入口程序改为如下便可以顺利执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class A 
{
public:
void inMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
// std::lock_guard<std::mutex> mutex_guard(my_mutex_);
my_mutex_.lock();
msgRecvQueue_.push_back(i);
my_mutex_.unlock();
}
}
// 为了加锁方便,把操作共享数据的部分提取出来
bool outMsgLULProc(int &command) {
// std::lock_guard<std::mutex> mutex_guard(my_mutex_);
my_mutex_.lock();
if (!msgRecvQueue_.empty()) {
command = msgRecvQueue_.front();
msgRecvQueue_.pop_front();
my_mutex_.unlock(); // 这里要注意!!!每一个程序执行的分支都要满足lock与unlock的对应
return true;
}
else
my_mutex_.unlock(); // 这里要注意!!!每一个程序执行的通路都要满足lock与unlock的对应
return false;
}
// 把数据从消息队列中取出的线程
void outMsgRecvQueue() {
int command = 0;
for (int i = 0; i < 100000; ++i) {
bool result = outMsgLULProc(command);
if (true == result) {
cout << "outMsgRecvQueue()执行,取出来1个数据" << command << endl;
// 可以进行数据处理..
}
else {
// 消息队列为空
cout << "outMsgRecevQueue()执行,但是消息队列为空"<< endl;
}
}
cout << "end" << endl;
}
private:
std::list<int> msgRecvQueue_;
std::mutex my_mutex_; // 加入的互斥锁
};

4.2、std::lock_guard类模板

lock(),忘记unlock()的问题非常难排查,为了防止出现这种情况,C++引入了一个std::lock_guard的类模板。它可以同时取代lock()unlock()函数,也就是说,用了lock_guard之后,再不能使用lock()unlock()。它的工作原理也比较简单:

  • 在构造函数里边执lock()函数;
  • 析构函数里边执行unlock()函数。

可见它不如lockunlock()灵活,一个技巧是可以在包含std::lock_guard的代码段中加入中括号{}来改变局部变量的作用域以提前析构std::lock_guard,实现提前解锁的目的。

4.3、死锁现象

死锁这个问题产生的前提条件是至少有两个互斥量,比如有两把锁头:金锁、银锁。

  1. 线程A执行时,A线程先锁金锁,金锁锁定成功,之后去lock银锁;(假设这时候发生了线程切换,银锁还没锁定就被切换走了);

  2. 线程B执行时,B线程先锁银锁,因为银锁还没被锁定,所以银锁会lock()成功,之后线程B要去lock金锁。

这时候就发生了死锁现象,线程B去lock金锁,而金锁因为B线程锁住了银锁致使A线程无法继续执行,从而不能unlock金锁;金锁不能unlock又会导致B线程无法lock金锁,进而B线程无法继续执行来unlock银锁,两个线程卡死。

死锁出现的原因主要是两个线程的互斥量锁住的顺序正好反过来了,例如A线程先锁金锁、再锁银锁;B线程先锁银锁、再锁金锁。只要保证两个互斥量上锁的顺序一致就不会造成死锁,所以一个解决方法就是保证互斥量上锁的顺序一致。

4.4、std::lock函数模板

std::lock()有能力一次锁住两个或者两个以上的互斥量(处理多个互斥量的时候才出手),它不存在因为在多个线程中因为锁的顺序问题导致的死锁。std::lock():如果互斥量中有一个没有锁住,它就阻塞,等所有互斥量都锁住,程序才继续执行。它的特点是:要么两个互斥量同时锁住,要么两个互斥量都未锁住,如果一个锁定成功另一个没锁定成功,则立即把已经锁定的锁解锁。对应的代码如下,原来的代码会出现死锁,我们用std::lock来解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class A 
{
public:
// 把收到的玩家消息入到一个队列
void inMsgRecvQueue() {
for (int i = 0; i < 100000; ++i) {
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
my_mutex1_.lock();
my_mutex2_.lock();
// std::lock(my_mutex1_, my_mutex2_); // 需要把前两行的lock()注释掉。
msgRecvQueue_.push_back(i);
my_mutex1_.unlock();
my_mutex2_.unlock();
}
}
// 为了加锁方便,把操作共享数据的部分提取出来
bool outMsgLULProc(int &command) {
my_mutex2_.lock();
my_mutex1_.lock();
// std::lock(my_mutex1_, my_mutex2_); // 需要把前两行的lock()注释掉。同时还需要自己调用unlock()
if (!msgRecvQueue_.empty()) {
command = msgRecvQueue_.front();
msgRecvQueue_.pop_front();
my_mutex1_.unlock(); // 这里要注意!!!每一个程序执行的分支都要满足lock与unlock的对应
my_mutex2_.unlock(); // 解锁的顺序随意
return true;
}
else
my_mutex2_.unlock(); // 这里要注意!!!每一个程序执行的通路都要满足lock与unlock的对应
my_mutex1_.unlock(); // 这里要注意!!!每一个程序执行的通路都要满足lock与unlock的对应
return false;
}
// 把数据从消息队列中取出的线程
void outMsgRecvQueue() {
int command = 0;
for (int i = 0; i < 100000; ++i) {
bool result = outMsgLULProc(command);
if (true == result) {
cout << "outMsgRecvQueue()执行,取出来1个数据" << command << endl;
// 可以进行数据处理..
}
else {
// 消息队列为空
cout << "outMsgRecevQueue()执行,但是消息队列为空"<< endl;
}
}
cout << "end" << endl;
}
private:
std::list<int> msgRecvQueue_;
std::mutex my_mutex1_;
std::mutex my_mutex2_;
}

使用std::lock()函数后,仍然需要我们自己手动解锁,为了防止忘记调用unlock()函数,我们依然可以使用std::lock_guard()函数模板!但是在调用std::lock()函数时我们已经加了一次锁,再次调用std::lock_guard()时不需要再lock()了,我们可以通过以下的方式来实现,这里给出上面例子的其中一个线程的入口函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void inMsgRecvQueue()
{
for (int i = 0; i < 100000; ++i)
{
cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
//my_mutex1_.lock();
//my_mutex2_.lock();
std::lock(my_mutex1_, my_mutex2_);
std::lock_guard<std::mutex> auto_mutex_1(my_mutex1_, std::adopt_lock);
std::lock_guard<std::mutex> auto_mutex_2(my_mutex2_, std::adopt_lock);
msgRecvQueue_.push_back(i); // 假设i就是收到的命令
//my_mutex1_.unlock();
//my_mutex2_.unlock();
}
}

其中std::adopt_lock是一个结构体对象,它起一个标记作用,表示这个互斥量已经lock()过了,不需要在std::lock_guard<std::mutex>里边对对象再次lock()了。

听说打赏我的人,最后都找到了真爱。