视觉教程第三弹:cpp进阶之多线程

视觉教程第三弹:cpp进阶之多线程

机械是血肉,电控是大脑,视觉是灵魂。


现代CPU几乎总是有多个核心可以参与计算,尤其是视觉会用到的各类计算平台,没有个4核8核什么的都不好意思拿出来卖。

虽然有多个CPU内核,但常规的编程都是顺序执行,只能由一个CPU内核进行计算。如果计算任务繁重,就会造成“一核有难,八核围观”的现象,此时看CPU监视器就会发现,一个核心的占用率几乎100%,而其他核心却基本在10%以内。显然这样一个软件设计,没有能利用上所有的硬件资源,造成了资源浪费,同时执行效率也不高。

本节我们主要探讨如何使用cpp进行多线程编程,以及多线程编程时的各类需要注意的点。

一、什么是多线程

线程是操作系统进行调度的最小单位,区别于进程,多线程共享同一个地址空间,意味着多线程之间的资源共享的开销会远小于多进程。

当程序启动了多个线程时,每个线程从逻辑上是并行运行的。但由于CPU核心数的限制,CPU无法同时并行执行大于核心数的任务。故当线程数大于核心数时,部分线程的执行并不是真正意义上的并行执行。而且由于操作系统上往往不会只运行这一个程序,所以CPU还需要腾出时间执行其他的程序,故实际的线程并行情况会再少一点。

当线程数大于CPU物理核心数时(最典型的例子是在单片机上,CPU通常只有一个核心),如果线程A正在运行,那么线程B一定是无法运行的,因为只有一个CPU内核嘛。线程B想要运行,需要线程A释放出CPU使用权,而释放出CPU使用权通常有两种释放方式:

  • 线程A主动让出CPU使用权。通常是因为线程A需要等待一些外部资源(如用户输入)而主动进入休眠状态。
  • 线程A被动让出CPU使用权。这种情况通常是由于产生了一个中断,打断了线程A的执行,而在中断退出前,操作系统往往会重新判断一下当前CPU使用权应该交给谁,如果操作系统决定将CPU使用权交给线程B(比如操作系统发现线程A长时间占用CPU),那么线程A就会被动交出CPU使用权。

由于存在被动交出CPU使用权的现象,程序员往往不能确定当前线程会在何时失去CPU使用权

二、什么时候用多线程

使用多线程的场景有很多,但他们都有一个共同的特点,被执行的任务是可分的。如果一个任务是不可分的,那么显然这样一个任务无法通过多线程执行。下面给出一个例子,对一个数组进行求和。显然,对一个数组进行求和,可以把数组进行拆分单独求和,再加起来。

// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
int part_sum(const std::vector<int> &vec, int begin, int end){
    int val = 0;
    for(int i=begin; i<end; i++) val += vec[i];
    return val;
}

// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
    auto len = vec.size();
    int sum1, sum2;
    std::thread t1([&](){ sum1 = part_sum(vec, 0, len/2); });
    std::thread t2([&](){ sum2 = part_sum(vec, len/2, len); });
    t1.join();
    t2.join();
    return sum1 + sum2;
}

上面的代码出现了几个比较新的东西:std::thread,lambda表达式,join()。其中lambda表达式不是这章要讲的内容,如果不了解可以自行百度。

std::thread是一个类型,负责管理一个线程。其构造函数第一个参数是新线程的入口函数,而之后的参数则是想要传递给入口函数的参数,如果没有需要传递的参数,可以不写。在成功定义了一个std::thread变量,并指定了入口函数后,新线程将直接启动运行。

join()函数是std::thread的一个成员函数,其功能是等待该线程执行完毕,再执行完毕前join()函数都不会返回。

反观上面的代码,首先将数组对半分为两个部分,并对于每个部分都启动一个线程进行求和,然后等待这两个线程计算完毕,最后合并两个线程的计算结果。很常规的一个多线程代码。在实际情况中,可以根据CPU的物理核心数来划分这个数组,并启动对应个数的线程进行计算,充分利用CPU资源。

三、多线程同步

注意到,上面的代码中,我们为每个线程单独定义了一个变量来保存中间计算结果,而最后合并这些中间计算结果,得到最终结果。为何要多此一举,直接让每个线程把数组的值累加到一个公共变量中可以吗?

// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
    for(int i=begin; i<end; i++) result += vec[i];
}

// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
    auto len = vec.size();
    int result = 0;
    std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
    std::thread t2([&](){ part_sum(vec, len/2, len, result); });
    t1.join();
    t2.join();
    return result;
}

在上面这个代码中,我们让每个线程直接把数组的值累加到一个公共的变量中。有兴趣的话,可以执行一下上面的代码,看看结果是什么。

这里可以告诉你们,结果通常不会是正确的,尤其是在数组比较大时,而且每次的结果还可能不一样。这里可能会导致很多刚接触多线程的人感到迷惑,同样的代码,怎么可能每次的计算结果不一样呢?这不科学啊。

关键的问题出在这句代码result += vec[i];我们可以分析一下,这句代码翻译成CPU指令需要几条指令。显然,首先从内存中读取result到寄存器,在寄存器内做加法,再写回内存。由于多线程是并行运行的,假如线程一读取内存后,还没把计算结果写回内存,线程二就读取了内存,这会导致线程一的计算结果丢失。这里用一个流程图来解释。假设初始内存值为0,每次加1。

  • 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
  • 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
  • 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
  • 线程B读取内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1
  • 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为2
  • 线程B回写内存——此时内存内的值为2,线程B所在CPU寄存器内的值为2

上面是一次正常的流程。最后内存内的值是2。

  • 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
  • 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
  • 线程B读取内存——此时内存内的值为0,线程B所在CPU寄存器内的值为0
  • 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
  • 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为1
  • 线程B回写内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1

上面是一次错误的流程。最后内存内的值是1。

由于线程执行的顺序,在不加控制时,是无法确定的。所以这就会导致每次运行的结果都不一样。

我们需要一种手段,用于控制线程的执行顺序,这种手段叫做线程同步。

3.1 互斥锁

在上面的例子中,为了保证写入不出错,最简单的方法是什么呢?当一个线程读取了该内存,但还未写入时,禁止其他线程读取该内存就好了嘛。由于一个线程读取内存后,可能还需要这个值进行一些运算(这里是加法运算),才会写回内存,所以上面的方法也可以说成,某一块代码区域不允许两个线程同时执行,必须等上一个线程执行完毕后,才允许下一个线程执行。

为了实现这个功能,我们可以使用std::mutex。下面给出它的使用方法。

// 定义一个std::mutex变量,用于进行线程同步
std::mutex mtx;

// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
    for(int i=begin; i<end; i++) {
        mtx.lock();
        result += vec[i];
        mtx.unlock();
    }
}

// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
    auto len = vec.size();
    int result = 0;
    std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
    std::thread t2([&](){ part_sum(vec, len/2, len, result); });
    t1.join();
    t2.join();
    return result;
}

对比之前的代码,主要是多出了一对lock()和unlock()代码。在调用lock()时,std::mutex会检查有没有其他线程已经调用过lock()并且还没有unlock(),如果有,那么当前线程进入休眠状态,等待其他线程unlock()后唤醒当前线程;如果没有,那么lock()函数直接返回。

std::mutex也被称为互斥锁,同一时间只能被一个线程上锁,其他线程想上锁时,必须等待当前锁被释放后,才能成功上锁。

也就是说,std::mutex可以保证,处于同一个mutex对象下的一对lock()和unlock()间的代码,不会有多个线程在同时运行。

下面给出一个多线程使用std::mutex进行同步的流程图,同样使用上面的例子。

  • 线程A尝试上锁,由于此时锁没有被上锁,线程A上锁成功
  • 线程A读取内存——此时内存内的值为0,线程A所在CPU寄存器内的值为0
  • 线程A执行加法计算——此时内存内的值为0,线程A所在CPU寄存器内的值为1
  • 线程B尝试上锁,由于此时锁已被上锁,线程B上锁失败,线程B休眠
  • 线程A回写内存——此时内存内的值为1,线程A所在CPU寄存器内的值为1
  • 线程A释放锁,由于此时线程B正在休眠等待锁,所以线程B被唤醒。
  • 线程B读取内存——此时内存内的值为1,线程B所在CPU寄存器内的值为1
  • 线程B执行加法计算——此时内存内的值为1,线程B所在CPU寄存器内的值为2
  • 线程B回写内存——此时内存内的值为2,线程B所在CPU寄存器内的值为2
  • 线程B释放锁,由于此时没有其他线程等待锁,所以无事发生。

在std::mutex的帮助下,最后的结果就不会是错误的了。

由于std::mutex需要手动unlock(),当一个代码段有多个退出点时,需要在每个退出点都加上unlock(),容易漏。而且如果该代码段还会抛出异常,手动unlock()会更加麻烦。这时我们通常会使用std::unique_lock或std::scoped_lock,这两个类型的作用就是在构造这个类型的对象时,传入一个std::mutex,随后会自动加锁,在对象析构时,也会自动释放锁。在它们的帮助下,上面的代码可以修改为:

// 定义一个std::mutex变量,用于进行线程同步
std::mutex mtx;

// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, int &result){
    for(int i=begin; i<end; i++) {
        std::unique_lock lock(mtx);
        result += vec[i];
    }
}

// int parallel_sum(...)省略了

3.2 原子变量

由于线程的休眠与唤醒涉及到系统调用,并且会发生寄存器的保存与恢复,以及堆栈的换出和换入,是一个时间开销比较大的操作。而上面的例子中,从线程A成功加锁到释放锁,期间只间隔了一次内存的读取与写入。在这段时间中,线程B甚至可能还没休眠完毕(虽然有些标准库实现的std::mutex不会在加锁失败后立即休眠,但这里我们暂时不考虑这种情况)。这意味着程序的运行效率会受到极大的影响。

针对仅对一次内存读写进行加锁的情况,我们可以使用原子变量,即std::atomic<>。其中模板参数类型是需要进行原子读写的变量类型。在原子变量的帮助下,之前的代码可以改写成下面的样子。

// 此函数对一个数组[begin, end)的区间进行求和,作为新线程的入口函数
void part_sum(const std::vector<int> &vec, int begin, int end, std::atomic<int> &result){
    for(int i=begin; i<end; i++) result += vec[i];
}

// 此函数使用两个线程,对一个数组进行求和
int parallel_sum(const std::vector<int> &vec){
    auto len = vec.size();
    std::atomic<int> result = 0;
    std::thread t1([&](){ part_sum(vec, 0, len/2, result); });
    std::thread t2([&](){ part_sum(vec, len/2, len, result); });
    t1.join();
    t2.join();
    return result;
}

那么原子变量和互斥锁的差别在哪里呢?简单的理解就是,互斥锁是通过线程休眠来等待其他线程完成当前任务,而原子变量是通过CPU空转来等待其他线程完成当前任务(准确的说是通过总线锁,但表现形式上和空转等待差不多)。因为没有发生线程切换,所以也就节省了线程切换时的开销。

3.3 条件变量

在有些场景下,一个线程可能希望满足一个条件后才继续运行,否则线程休眠等待该条件的发生。最典型的情况就是线程等待用户的输入,在没有用户输入时,线程直接休眠等待输入。这种情况就可以使用条件变量。

using namespace std::chrono;

// 定义条件变量
std::condition_variable cv;
// 条件变量需要配合互斥锁使用
std::mutex mtx;

// 定义一个函数模拟用户产生输入
void user(std::string &str, bool &is_exit){
    // 共计产生100个输入
    for(int i=0; i<100; i++){
        {
            std::unique_lock lock(mtx);
            // 产生一个输入
            str = "input " + std::to_string(i);
            // 唤醒一个等待中的线程
            cv.notify_one();
        }
        // 线程休眠1s,模拟用户每秒产生一个输入
        std::this_thread::sleep_for(1s);
    }
    is_exit = true;
    // 唤醒一个等待中的线程
    cv.notify_one();
}

int main(){
    bool is_exit = false;                // 用于标志线程是否运行完毕
    std::string str;                     // 用于接收用户输入
    std::thread t1(user, str, is_exit);  // 启动模拟用户输入线程
    while(true){
        std::unique_lock lock(mtx);
        // 这个条件变量等待的条件是,线程执行完毕或字符串非空(表示有用户输入)
        cv.wait(lock, [](){return is_exit || !str.empty()});
        // 如果线程结束,则退出主循环
        if(is_exit) break;
        // 打印用户输入,并清空用户输入,用于接收下一个用户输入
        std::cout << str << std::endl;
        str.clear();
    }
}

从上面的代码中可以注意到,条件变量的使用有几个关键点:

  • 必须配合互斥锁使用
  • 每当条件变量等待的条件发生变化,必须手动调用条件变量的唤醒函数notify_one()或notify_all()

四、作业与思考题

  • 互斥锁能不能采用CPU空转的方式来等待其他线程释放锁?为什么?
  • 条件变量为什么要和互斥锁配合使用?
  • 将最后一个例子改成有多个模拟用户输入的线程同时运行,同时要求最先产生的用户输入编号为0,后产生的依次+1。而主循环打印出的用户输入也应该是从0开始依次加一。即input 0 input 1 input 2 ...。提示:可以结合条件变量和原子变量一起使用。

作者:唐欣阳,github主页:传送门