并行程序的潜在问题 (一)

建立 Thread 的成本远低於 Process 的成本,执行 Context switch 时的效能也有显着的差异。既然并行程序这麽好,为什麽资工系不在一开始就将相关技巧教给学生、甚至是到毕业都没学到呢?
很显然的,当执行绪一多时,如果开发者的头脑不够清楚便会产出各式各样的问题以及臭虫,本篇文章整理了一些并行程序的眉眉角角,欢迎大家一同学习并给予指教。

Race condition

Race condition 用来描述一个系统或者 Process 的输出依赖於不受控制的事件出现顺序、出现时机。
像是: 有多个 Process 尝试存取同一个记忆体位置,如果没有处理好,就有可能发生无法预期的执行结果,这种情况容易发生在错误的後端程序、资料库、档案系统设计或是其他采用多执行绪设计的程序。

范例

考虑以下程序码:

#include <pthread.h>
void* myfunc(void* ptr) {
    int i = *((int *) ptr);
    printf("%d ", i);
    return NULL;
}

int main() {
    int i;
    pthread_t tid;
    for(i =0; i < 10; i++) {
        pthread_create(&tid, NULL, myfunc, &i);
    }
    pthread_exit(NULL);
}

上面的范例使用 POSIX Thread 建立 10 个执行绪,其程序目的是希望能够接连印出 0 - 9 的数字。
实际上,其执行结果与预期行为大有不同!程序执行後,印出的结果会在不同数字间跳动,其原因在查看 pthread_create() 的定义後便能轻松找出:

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

我们可以看到,传递到子执行绪的参数必须是 void 指标,也就代表,我们并不是传递实体数字给子执行绪,是只交给他参数的记忆体位址而已。
也因为这样,当作业系统在各执行绪间反覆切换时各个子执行绪就会同时操作同一个记忆体的资料,无法确保执行绪的执行顺序,这个情况就是 Race condition

解决办法

SystemProgramming 的教材中也提供了问题的解法,其解题思维就是避免多个执行绪同时存取相同的记忆体。
首先,先定义一个结构:

struct T {
  pthread_t id;
  int start;
  char result[100];
};

定义好後,进行记忆体分配:

struct T *info = calloc(10 , sizeof(struct T));

做到这一步,就已经为即将建立的 10 个子执行绪保留好位置了,接着我们可以将相关参数传给 pthread_create() 做执行绪的建立工作:

pthread_create(&info[i].id, NULL, func, &info[i]);

Mutex lock && Condition variables

在介绍 Mutex lock 与 Condition variable 之前,我们要先了解他的应用场景。

Critical sections

Critical sections 代表某程序区段只能在同一时间被一个执行绪处理,如果有多个执行绪同时执行了这段程序码,可能会有超出预期的错误行为出现。

取自 CS:APP Concurrent Programming, Page 40-56 。

范例

考虑以下程序码:

int sum = 0;

void *countgold(void *param)
{
    int i;
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    return NULL;
}

该程序码若是在单一执行绪上工作,并不会出现问题。
不过!如果使用 POSIX Thread 建立多个执行绪处理 countgold() 时,由於多个执行绪同时存取同一个记忆体的资料 (在这个范例中为 sum ),便会造成前面提到的 Race condition。

解决方法

要避免 Race condition,我们只需要预防 Critical sections 在多个执行绪同时执行,我们先标示出程序中的 Critical sections:

// ...
{
    for (i = 0; i < 10000000; i++)
    {
        // critical section!
        sum += 1;
        // end
    }
}
// ...    

问题的解决办法就是: 在存取 sum 变数行为的前面加上一道锁,任何执行绪访问他之前都需要将锁上锁,等到操作完再由上锁的执行绪做解锁。
知道了锁的需求以及应用面後,我们就可以来认识 Mutex lock 啦!

Mutex lock API

使用 Mutex lock 前须详阅公开说明书,互斥锁有以下特性:

  • 谁负责上锁就由谁负责解锁
  • 在销毁锁之前,必须确保没有执行绪被这个锁 block
  • Mutex lock 有专属的 type: pthread_mutex_t
#include <pthread.h>
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; // 初始化-法1
pthread_mutex_init(&m, NULL); // 初始化-法2
pthread_mutex_lock(&m);   // 上锁
pthread_mutex_unlock(&m); // 解锁
pthread_mutex_destroy(&m) // 销毁锁

使用 PTHREAD_MUTEX_INITIALIZER 初始化时,必须确保锁是全域变数!

掌握 Mutex lock 後,我们将上面的范例稍做修改:

int sum = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void *countgold(void *param)
{
    int i;
    pthread_mutex_lock(&m);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m);
    return NULL;
}

上面的范例会确保执行绪执行完 10000000 次的回圈才做解锁,当然,我们也可以换个做法:

void *countgold(void *param)
{
    int i;
    for (i = 0; i < 10000000; i++)
    {
        pthread_mutex_lock(&m);
        sum += 1;
        pthread_mutex_unlock(&m);
    }
    return NULL;
}

这样的做法同样能预防竞争危害,不过执行时间会更长。

注意事项!

使用 mutex lock 时,多个执行绪仍然会因为作业系统排程不断的做 Context switch ,概念有点像是:

  1. Main-Thread: 等待所有 user-level 的 thread 都 join。
  2. Thread-1: 我现在将 sum 给上锁了,正在执行操作....。
  3. Thread-2: 我没办法对 sum 操作,继续等吧。
  4. Main-Thread: 等待所有 user-level 的 thread 都 join。
  5. Thread-1: 我现在将 sum 给上锁了,正在执行操作....。
  6. Thread-2: 我没办法对 sum 操作,继续等吧。
  7. Main-Thread: 等待所有 user-level 的 thread 都 join。
  8. Thread-1: 我对 sum 的操作结束了,解锁并 Join 。
  9. Thread-2: 现在没上锁,我要上锁了。
  10. Main-Thread: 等待所有 user-level 的 thread 都 join。
  11. Thread-2: 我现在将 sum 给上锁了,正在执行操作....。
  12. Thread-2: 我对 sum 的操作结束了,解锁并 Join。
  13. Main-Thread: 所有 user-level 的 thread 都 join 了,程序结束。

不只如此,程序设计者需要确保多个执行绪尝试上的是同一个锁!

#include <stdio.h>
#include <pthread.h>
int sum = 0;
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
void *countgold1(void *param)
{
    int i;
    pthread_mutex_lock(&m1);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m1);
    return NULL;
}
void *countgold2(void *param)
{
    int i;
    pthread_mutex_lock(&m2);
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    pthread_mutex_unlock(&m2);
    return NULL;
}
int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, countgold1, NULL);
    pthread_create(&tid2, NULL, countgold2, NULL);

    //Wait for both threads to finish:
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    printf("ARRRRG sum is %d\n", sum);
    return 0;
}

在上面范例中,执行绪 1 以及 2 都各自上了锁,两者同时开心的对 sum 变数执行操作,race condition 的状况一样发生在该程序身上。

Condition variables

Condition variables 通常会跟 Mutex 搭配使用。

int pthread_cond_init(pthread_cond_t *cond,const pthread_condattr_t  *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // all threads waiting on a condition need to be woken up


解释:

  1. 检查 mutex 是否上锁?若否则上锁并向下执行
  2. 使用 Condition variables 时,我们会设立一些判断条件,视情况继续执行或是呼叫 pthread_cond_wait() 解锁并进入 waiting queue。
  3. 若经过条件判断後,Task-1 继续执行,便会执行到 Task-1 结束前才解锁。
  4. 如果上锁,则进入 waiting queue,否则上锁并执行程序码,并且当 Task-2 更改判断条件後须使用 pthread_cond_signal() 更改 Condition variable 的状态。

使用 Condition variables 改写生产者/消费者问题

修改程序码之前,需要知道 pthread_cond_wait() 被呼叫後会有以下行为:

  1. 解锁,供其他执行绪上锁
  2. 进入休眠,直到收到 pthread_cond_signal()
  3. 被唤醒後,要重新上锁!

了解 API 的行为後,考虑以下程序码:

int buffer[10];
int *in;
int count = 0;
void *producer(void *arg){
	for(;;){
		while(count == MAX_SIZE);
		pthread_mutex_lock(&lock);
		buffer[count] = 1;
		count++;
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}
void *consumer(void *arg){
	for(;;){
		while(count == 0);
		pthread_mutex_lock(&lock);
		count--;
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}

使用 Condition variables 改写:

pthread_cond_t notFull = PTHREAD_COND_INITIALIZER;
pthread_cond_t notEmpty = PTHREAD_COND_INITIALIZER;
int buffer[10];
int *in;
int count = 0;
void *producer(void *arg){
	for(;;){
		while(count == MAX_SIZE)
				pthread_cond_wait(notFull, lock);
		pthread_mutex_lock(&lock);
		buffer[count] = 1;
		count++;
		pthread_cond_signal(notEmpty);
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}
void *consumer(void *arg){
	for(;;){
		while(count == 0);
				pthread_cond_wait(notEmpty, lock);
		pthread_mutex_lock(&lock);
		count--;
		pthread_cond_signal(notFull);
		printf("%d", count);
		pthread_mutex_unlock(&lock);
	}
}

Reference


<<:  Day 29 隐私规划与UI设计定义实作

>>:  Day28 Android - tablayout+fragment

[Day 11] SRE - 事後检讨,拜托拜托让我吸个经验值

从历史中学习 我们最讨厌事件历史重演QQ 在每次遇到问题後,我们全员都会一起开个检讨会议,当中会提到...

JS this DAY64

this 看到这个是不是很头痛??? 但别怕 接着往下看 this 基本观念 每个执行环境都有属於自...

创建App後半部界面

在已建设的登入界面与App主界面後,今天来建设App的後部分界面,也就是功能主要在的大部分,其中更分...

D14 - 「类比×电压×输入」:类比功能

先从 Firmata 找到类比相关功能。 类比输入(Analog Input) 在 Supporte...

Day 10:架设 Prometheus (2)

设定 alerting rules 昨天成功的让 Prometheus 收集了一些指标,那麽今天就来...