视觉教程第三弹:cpp进阶之多线程
机械是血肉,电控是大脑,视觉是灵魂。
现代CPU几乎总是有多个核心可以参与计算,尤其是视觉会用到的各类计算平台,没有个4核8核什么的都不好意思拿出来卖。
虽然有多个CPU内核,但常规的编程都是顺序执行,只能由一个CPU内核进行计算。如果计算任务繁重,就会造成“一核有难,八核围观”的现象,此时看CPU监视器就会发现,一个核心的占用率几乎100%,而其他核心却基本在10%以内。显然这样一个软件设计,没有能利用上所有的硬件资源,造成了资源浪费,同时执行效率也不高。
本节我们主要探讨如何使用cpp进行多线程编程,以及多线程编程时的各类需要注意的点。
一、什么是多线程
线程是操作系统进行调度的最小单位,区别于进程,多线程共享同一个地址空间,意味着多线程之间的资源共享的开销会远小于多进程。
当程序启动了多个线程时,每个线程从逻辑上是并行运行的。但由于CPU核心数的限制,CPU无法同时并行执行大于核心数的任务。故当线程数大于核心数时,部分线程的执行并不是真正意义上的并行执行。而且由于操作系统上往往不会只运行这一个程序,所以CPU还需要腾出时间执行其他的程序,故实际的线程并行情况会再少一点。
当线程数大于CPU物理核心数时(最典型的例子是在单片机上,CPU通常只有一个核心),如果线程A正在运行,那么线程B一定是无法运行的,因为只有一个CPU内核嘛。线程B想要运行,需要线程A释放出CPU使用权,而释放出CPU使用权通常有两种释放方式:
- 线程A主动让出CPU使用权。通常是因为线程A需要等待一些外部资源(如用户输入)而主动进入休眠状态。
- 线程A被动让出CPU使用权。这种情况通常是由于产生了一个中断,打断了线程A的执行,而在中断退出前,操作系统往往会重新判断一下当前CPU使用权应该交给谁,如果操作系统决定将CPU使用权交给线程B(比如操作系统发现线程A长时间占用CPU),那么线程A就会被动交出CPU使用权。
由于存在被动交出CPU使用权的现象,程序员往往不能确定当前线程会在何时失去CPU使用权。
二、什么时候用多线程
使用多线程的场景有很多,但他们都有一个共同的特点,被执行的任务是可分的。如果一个任务是不可分的,那么显然这样一个任务无法通过多线程执行。下面给出一个例子,对一个数组进行求和。显然,对一个数组进行求和,可以把数组进行拆分单独求和,再加起来。
// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
int part_sum(const std::vector<int> &vec, int begin, int end){
int val = 0;
for(int i=begin; i<end; i++) val += vec[i];
return val;
}
// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
auto len = vec.size();
int sum1, sum2;
std::thread t1([&](){ sum1 = part_sum(vec, 0, len/2); });
std::thread t2([&](){ sum2 = part_sum(vec, len/2, len); });
t1.join();
t2.join();
return sum1 + sum2;
}
上面的代码出现了几个比较新的东西:std::thread,lambda表达式,join()。其中lambda表达式不是这章要讲的内容,如果不了解可以自行百度。
std::thread是一个类型,负责管理一个线程。其构造函数第一个参数是新线程的入口函数,而之后的参数则是想要传递给入口函数的参数,如果没有需要传递的参数,可以不写。在成功定义了一个std::thread变量,并指定了入口函数后,新线程将直接启动运行。
join()函数是std::thread的一个成员函数,其功能是等待该线程执行完毕,再执行完毕前join()函数都不会返回。
反观上面的代码,首先将数组对半分为两个部分,并对于每个部分都启动一个线程进行求和,然后等待这两个线程计算完毕,最后合并两个线程的计算结果。很常规的一个多线程代码。在实际情况中,可以根据CPU的物理核心数来划分这个数组,并启动对应个数的线程进行计算,充分利用CPU资源。
三、多线程同步
注意到,上面的代码中,我们为每个线程单独定义了一个变量来保存中间计算结果,而最后合并这些中间计算结果,得到最终结果。为何要多此一举,直接让每个线程把数组的值累加到一个公共变量中可以吗?
// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
for(int i=begin; i<end; i++) result += vec[i];
}
// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
auto len = vec.size();
int result = 0;
std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
std::thread t2([&](){ part_sum(vec, len/2, len, result); });
t1.join();
t2.join();
return result;
}
在上面这个代码中,我们让每个线程直接把数组的值累加到一个公共的变量中。有兴趣的话,可以执行一下上面的代码,看看结果是什么。
这里可以告诉你们,结果通常不会是正确的,尤其是在数组比较大时,而且每次的结果还可能不一样。这里可能会导致很多刚接触多线程的人感到迷惑,同样的代码,怎么可能每次的计算结果不一样呢?这不科学啊。
关键的问题出在这句代码result += vec[i];
我们可以分析一下,这句代码翻译成CPU指令需要几条指令。显然,首先从内存中读取result到寄存器,在寄存器内做加法,再写回内存。由于多线程是并行运行的,假如线程一读取内存后,还没把计算结果写回内存,线程二就读取了内存,这会导致线程一的计算结果丢失。这里用一个流程图来解释。假设初始内存值为0,每次加1。
- 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
- 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
- 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
- 线程B读取内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1
- 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为2
- 线程B回写内存——此时内存内的值为2,线程B所在CPU寄存器内的值为2
上面是一次正常的流程。最后内存内的值是2。
- 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
- 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
- 线程B读取内存——此时内存内的值为0,线程B所在CPU寄存器内的值为0
- 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
- 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为1
- 线程B回写内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1
上面是一次错误的流程。最后内存内的值是1。
由于线程执行的顺序,在不加控制时,是无法确定的。所以这就会导致每次运行的结果都不一样。
我们需要一种手段,用于控制线程的执行顺序,这种手段叫做线程同步。
3.1 互斥锁
在上面的例子中,为了保证写入不出错,最简单的方法是什么呢?当一个线程读取了该内存,但还未写入时,禁止其他线程读取该内存就好了嘛。由于一个线程读取内存后,可能还需要这个值进行一些运算(这里是加法运算),才会写回内存,所以上面的方法也可以说成,某一块代码区域不允许两个线程同时执行,必须等上一个线程执行完毕后,才允许下一个线程执行。
为了实现这个功能,我们可以使用std::mutex。下面给出它的使用方法。
// 定义一个std::mutex变量,用于进行线程同步
std::mutex mtx;
// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
for(int i=begin; i<end; i++) {
mtx.lock();
result += vec[i];
mtx.unlock();
}
}
// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
auto len = vec.size();
int result = 0;
std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
std::thread t2([&](){ part_sum(vec, len/2, len, result); });
t1.join();
t2.join();
return result;
}
对比之前的代码,主要是多出了一对lock()和unlock()代码。在调用lock()时,std::mutex会检查有没有其他线程已经调用过lock()并且还没有unlock(),如果有,那么当前线程进入休眠状态,等待其他线程unlock()后唤醒当前线程;如果没有,那么lock()函数直接返回。
std::mutex也被称为互斥锁,同一时间只能被一个线程上锁,其他线程想上锁时,必须等待当前锁被释放后,才能成功上锁。
也就是说,std::mutex可以保证,处于同一个mutex对象下的一对lock()和unlock()间的代码,不会有多个线程在同时运行。
下面给出一个多线程使用std::mutex进行同步的流程图,同样使用上面的例子。
- 线程A尝试上锁,由于此时锁没有被上锁,线程A上锁成功
- 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
- 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
- 线程B尝试上锁,由于此时锁已被上锁,线程B上锁失败,线程B休眠
- 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
- 线程A释放锁,由于此时线程B正在休眠等待锁,所以线程B被唤醒。
- 线程B读取内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1
- 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为2
- 线程B回写内存——此时内存内的值为2,线程B所在CPU寄存器内的值为2
- 线程B释放锁,由于此时没有其他线程等待锁,所以无事发生。
在std::mutex的帮助下,最后的结果就不会是错误的了。
由于std::mutex需要手动unlock(),当一个代码段有多个退出点时,需要在每个退出点都加上unlock(),容易漏。而且如果该代码段还会抛出异常,手动unlock()会更加麻烦。这时我们通常会使用std::unique_lock或std::scoped_lock,这两个类型的作用就是在构造这个类型的对象时,传入一个std::mutex,随后会自动加锁,在对象析构时,也会自动释放锁。在它们的帮助下,上面的代码可以修改为:
// 定义一个std::mutex变量,用于进行线程同步
std::mutex mtx;
// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
for(int i=begin; i<end; i++) {
std::unique_lock lock(mtx);
result += vec[i];
}
}
// int parallel_sum(...)省略了
3.2 原子变量
由于线程的休眠与唤醒涉及到系统调用,并且会发生寄存器的保存与恢复,以及堆栈的换出和换入,是一个时间开销比较大的操作。而上面的例子中,从线程A成功加锁到释放锁,期间只间隔了一次内存的读取与写入。在这段时间中,线程B甚至可能还没休眠完毕(虽然有些标准库实现的std::mutex不会在加锁失败后立即休眠,但这里我们暂时不考虑这种情况)。这意味着程序的运行效率会受到极大的影响。
针对仅对一次内存读写进行加锁的情况,我们可以使用原子变量,即std::atomic<>。其中模板参数类型是需要进行原子读写的变量类型。在原子变量的帮助下,之前的代码可以改写成下面的样子。
// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, std::atomic<int> &result){
for(int i=begin; i<end; i++) result += vec[i];
}
// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
auto len = vec.size();
std::atomic<int> result = 0;
std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
std::thread t2([&](){ part_sum(vec, len/2, len, result); });
t1.join();
t2.join();
return result;
}
那么原子变量和互斥锁的差别在哪里呢?简单的理解就是,互斥锁是通过线程休眠来等待其他线程完成当前任务,而原子变量是通过CPU空转来等待其他线程完成当前任务(准确的说是通过总线锁,但表现形式上和空转等待差不多)。因为没有发生线程切换,所以也就节省了线程切换时的开销。
3.3 条件变量
在有些场景下,一个线程可能希望满足一个条件后才继续运行,否则线程休眠等待该条件的发生。最典型的情况就是线程等待用户的输入,在没有用户输入时,线程直接休眠等待输入。这种情况就可以使用条件变量。
using namespace std::chrono;
// 定义条件变量
std::condition_variable cv;
// 条件变量需要配合互斥锁使用
std::mutex mtx;
// 定义一个函数模拟用户产生输入
void user(std::string &str, bool &is_exit){
// 共计产生100个输入
for(int i=0; i<100; i++){
{
std::unique_lock lock(mtx);
// 产生一个输入
str = "input " + std::to_string(i);
// 唤醒一个等待中的线程
cv.notify_one();
}
// 线程休眠1s,模拟用户每秒产生一个输入
std::this_thread::sleep_for(1s);
}
is_exit = true;
// 唤醒一个等待中的线程
cv.notify_one();
}
int main(){
bool is_exit = false; // 用于标志线程是否运行完毕
std::string str; // 用于接收用户输入
std::thread t1(user, str, is_exit); // 启动模拟用户输入线程
while(true){
std::unique_lock lock(mtx);
// 这个条件变量等待的条件是,线程执行完毕或字符串非空(表示有用户输入)
cv.wait(lock, [](){return is_exit || !str.empty()});
// 如果线程结束,则退出主循环
if(is_exit) break;
// 打印用户输入,并清空用户输入,用于接收下一个用户输入
std::cout << str << std::endl;
str.clear();
}
}
从上面的代码中可以注意到,条件变量的使用有几个关键点:
- 必须配合互斥锁使用
- 每当条件变量等待的条件发生变化,必须手动调用条件变量的唤醒函数notify_one()或notify_all()
四、作业与思考题
- 互斥锁能不能采用CPU空转的方式来等待其他线程释放锁?为什么?
- 条件变量为什么要和互斥锁配合使用?
- 将最后一个例子改成有多个模拟用户输入的线程同时运行,同时要求最先产生的用户输入编号为0,后产生的依次+1。而主循环打印出的用户输入也应该是从0开始依次加一。即
input 0 input 1 input 2 ...
。提示:可以结合条件变量和原子变量一起使用。
作者:唐欣阳,github主页:传送门。