Stanford CS149 (Fall 2023) Parallel Computing

Programming Assignment 2: Scheduling Task Graphs on a Multi-Core CPU

Github repo

Part A: Synchronous Bulk Task Launch

创建三种不同的 Task execution engine,难点在于看懂要求

Step 1: Move to a Parallel Task System

普通多线程版本,感觉动态分配任务的话有点浪费资源,不过静态分配如果任务运行时间不同就有可能效果不好

因为多线程之间有竞争,所以需要加锁或者原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TaskSystemParallelSpawn : public ITaskSystem {
private:
int num_threads_;
std::thread *threads_;

public:
TaskSystemParallelSpawn(int num_threads);
~TaskSystemParallelSpawn();
const char *name();
void run(IRunnable *runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable *runnable, int num_total_tasks,
const std::vector<TaskID> &deps);
void sync();
};

TaskSystemParallelSpawn::TaskSystemParallelSpawn(int num_threads)
: ITaskSystem(num_threads), num_threads(num_threads),
threads(new std::thread[num_threads]) {}

TaskSystemParallelSpawn::~TaskSystemParallelSpawn() { delete[] threads; }

void TaskSystemParallelSpawn::run(IRunnable *runnable, int num_total_tasks) {
std::atomic_int task_id(0);
for (int i = 0; i < num_threads; ++i) {
threads[i] = std::thread([&] {
while (true) {
int id = task_id.fetch_add(1);
if (id >= num_total_tasks) {
break;
}
runnable->runTask(id, num_total_tasks);
}
});
}

for (int i = 0; i < num_threads; ++i) {
threads[i].join();
}
}

Step 2: Avoid Frequent Thread Creation Using a Thread Pool

线程池版本,后台创建 num_threads 个线程,然后等待任务。每次获取任务时我直接加了一把大锁,感觉只用原子不好做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class TaskSystemParallelThreadPoolSpinning : public ITaskSystem {
private:
int num_threads_;
std::thread *threads_;
bool end_;

IRunnable *runnable_;
int num_total_tasks_;
int current_task_id_;
std::mutex mutex_;
std::atomic_int num_finished_task_;

public:
TaskSystemParallelThreadPoolSpinning(int num_threads);
~TaskSystemParallelThreadPoolSpinning();
const char *name();
void run(IRunnable *runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable *runnable, int num_total_tasks,
const std::vector<TaskID> &deps);
void sync();
};

TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(
int num_threads)
: ITaskSystem(num_threads), num_threads_(num_threads),
threads_(new std::thread[num_threads]), end_(false), runnable_(nullptr) {
for (int i = 0; i < num_threads; ++i) {
threads_[i] = std::thread([&] {
while (true) {
IRunnable *runnable = nullptr;
int task_id;
mutex_.lock();
if (runnable_) {
runnable = runnable_;
task_id = current_task_id_++;
if (current_task_id_ >= num_total_tasks_) {
runnable_ = nullptr;
}
}
mutex_.unlock();
if (runnable) {
runnable->runTask(task_id, num_total_tasks_);
num_finished_task_.fetch_add(1);
} else if (end_) {
break;
}
}
});
}
}

TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning() {
end_ = true;
for (int i = 0; i < num_threads_; ++i) {
threads_[i].join();
}
delete[] threads_;
}

void TaskSystemParallelThreadPoolSpinning::run(IRunnable *runnable,
int num_total_tasks) {
mutex_.lock();
runnable_ = runnable;
num_total_tasks_ = num_total_tasks;
current_task_id_ = 0;
num_finished_task_ = 0;
mutex_.unlock();
while (num_finished_task_ < num_total_tasks) {
// spin
}
}

Step 3: Put Threads to Sleep When There is Nothing to Do

在上面的基础上再利用条件变量实现睡眠

这是我第一次真正用条件变量,只能说这种东西写少了属实折磨

另外我在主线程中用的是 spin,然后被编译器优化导致死循环,最后加 volatile 修饰解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class TaskSystemParallelThreadPoolSleeping : public ITaskSystem {
private:
int num_threads_;
std::thread *threads_;
bool end_;

IRunnable *volatile runnable_;
int num_total_tasks_;
int current_task_id_;
std::mutex mutex_;

volatile int num_sleeping_threads_;
std::condition_variable cv_;

public:
TaskSystemParallelThreadPoolSleeping(int num_threads);
~TaskSystemParallelThreadPoolSleeping();
const char *name();
void run(IRunnable *runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable *runnable, int num_total_tasks,
const std::vector<TaskID> &deps);
void sync();
};

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(
int num_threads)
: ITaskSystem(num_threads), num_threads_(num_threads),
threads_(new std::thread[num_threads]), end_(false), runnable_(nullptr),
num_sleeping_threads_(0) {
for (int i = 0; i < num_threads; ++i) {
threads_[i] = std::thread([&] {
while (true) {
if (end_) {
break;
}
std::unique_lock<std::mutex> lock(mutex_);
if (IRunnable *runnable = runnable_) {
int task_id = current_task_id_++;
if (current_task_id_ >= num_total_tasks_) {
runnable_ = nullptr;
}
lock.unlock();
runnable->runTask(task_id, num_total_tasks_);
} else {
num_sleeping_threads_++;
cv_.wait(lock);
num_sleeping_threads_--;
}
}
});
}
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
end_ = true;
while (num_sleeping_threads_ > 0) {
cv_.notify_all();
}
for (int i = 0; i < num_threads_; ++i) {
threads_[i].join();
}
delete[] threads_;
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable *runnable,
int num_total_tasks) {
std::unique_lock<std::mutex> lock(mutex_);
runnable_ = runnable;
num_total_tasks_ = num_total_tasks;
current_task_id_ = 0;
lock.unlock();
cv_.notify_all();
while (runnable_ || num_sleeping_threads_ < num_threads_) {
// spin
}
}

最终性能测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
~/codes/CS149/asst2/part_a (master*) » python ../tests/run_test_harness.py                                                                                                                                                                        mizukicry@S-Terminal
================================================================================
Running task system grading harness... (11 total tests)
- Detected CPU with 16 execution contexts
- Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 6.839 7.323 0.93 (OK)
[Parallel + Always Spawn] 107.754 111.695 0.96 (OK)
[Parallel + Thread Pool + Spin] 7.095 16.314 0.43 (OK)
[Parallel + Thread Pool + Sleep] 57.004 56.313 1.01 (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
STUDENT REFERENCE PERF?
[Serial] 46.945 47.279 0.99 (OK)
[Parallel + Always Spawn] 108.165 109.2 0.99 (OK)
[Parallel + Thread Pool + Spin] 11.698 23.173 0.50 (OK)
[Parallel + Thread Pool + Sleep] 53.053 53.36 0.99 (OK)
================================================================================
Executing test: ping_pong_equal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal
STUDENT REFERENCE PERF?
[Serial] 756.903 759.672 1.00 (OK)
[Parallel + Always Spawn] 198.972 210.784 0.94 (OK)
[Parallel + Thread Pool + Spin] 136.811 163.145 0.84 (OK)
[Parallel + Thread Pool + Sleep] 154.063 176.665 0.87 (OK)
================================================================================
Executing test: ping_pong_unequal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal
STUDENT REFERENCE PERF?
[Serial] 1297.036 1328.119 0.98 (OK)
[Parallel + Always Spawn] 282.762 286.793 0.99 (OK)
[Parallel + Thread Pool + Spin] 223.534 235.168 0.95 (OK)
[Parallel + Thread Pool + Sleep] 238.699 252.337 0.95 (OK)
================================================================================
Executing test: recursive_fibonacci...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci
STUDENT REFERENCE PERF?
[Serial] 703.682 1324.785 0.53 (OK)
[Parallel + Always Spawn] 116.104 196.208 0.59 (OK)
[Parallel + Thread Pool + Spin] 113.556 202.847 0.56 (OK)
[Parallel + Thread Pool + Sleep] 115.634 192.843 0.60 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop
STUDENT REFERENCE PERF?
[Serial] 494.335 506.367 0.98 (OK)
[Parallel + Always Spawn] 552.84 546.948 1.01 (OK)
[Parallel + Thread Pool + Spin] 110.831 129.962 0.85 (OK)
[Parallel + Thread Pool + Sleep] 282.095 288.475 0.98 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fewer_tasks...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks
STUDENT REFERENCE PERF?
[Serial] 495.172 505.118 0.98 (OK)
[Parallel + Always Spawn] 538.859 541.259 1.00 (OK)
[Parallel + Thread Pool + Spin] 127.93 145.016 0.88 (OK)
[Parallel + Thread Pool + Sleep] 280.496 290.938 0.96 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fan_in...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in
STUDENT REFERENCE PERF?
[Serial] 257.69 262.913 0.98 (OK)
[Parallel + Always Spawn] 89.402 90.371 0.99 (OK)
[Parallel + Thread Pool + Spin] 46.508 51.993 0.89 (OK)
[Parallel + Thread Pool + Sleep] 60.783 68.523 0.89 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree
STUDENT REFERENCE PERF?
[Serial] 253.9 258.621 0.98 (OK)
[Parallel + Always Spawn] 52.508 52.753 1.00 (OK)
[Parallel + Thread Pool + Spin] 41.257 42.298 0.98 (OK)
[Parallel + Thread Pool + Sleep] 45.557 46.396 0.98 (OK)
================================================================================
Executing test: spin_between_run_calls...
Reference binary: ./runtasks_ref_linux
Results for: spin_between_run_calls
STUDENT REFERENCE PERF?
[Serial] 249.726 470.106 0.53 (OK)
[Parallel + Always Spawn] 126.316 236.662 0.53 (OK)
[Parallel + Thread Pool + Spin] 147.054 268.436 0.55 (OK)
[Parallel + Thread Pool + Sleep] 126.533 235.999 0.54 (OK)
================================================================================
Executing test: mandelbrot_chunked...
Reference binary: ./runtasks_ref_linux
Results for: mandelbrot_chunked
STUDENT REFERENCE PERF?
[Serial] 331.607 322.024 1.03 (OK)
[Parallel + Always Spawn] 42.408 42.856 0.99 (OK)
[Parallel + Thread Pool + Spin] 42.695 42.811 1.00 (OK)
[Parallel + Thread Pool + Sleep] 42.959 43.082 1.00 (OK)
================================================================================
Overall performance results
[Serial] : All passed Perf
[Parallel + Always Spawn] : All passed Perf
[Parallel + Thread Pool + Spin] : All passed Perf
[Parallel + Thread Pool + Sleep] : All passed Perf

Part B: Supporting Execution of Task Graphs

在 Part A Sleep 版本的基础上实现 Task Graph

由于给出的依赖关系是一个 DAG,也就是实现一个拓扑排序

没什么好说的,多线程这东西就是看起来简单写起来要命,只能多练

顺便把主线程也改成了 Sleep,细分锁的粒度或者换原子变量之类的性能还可以继续优化,不过过了就暂时这样吧,清明假期摸会鱼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class TaskSystemParallelThreadPoolSleeping : public ITaskSystem {
private:
struct Task {
IRunnable *runnable;
int num_total_tasks;
int current_task;
int num_deps;
std::vector<TaskID> successors;
int num_finished_tasks;
};

int next_task_id_;
std::unordered_map<TaskID, Task> tasks_;
std::list<Task> ready_tasks_;

bool end_;
int num_threads_;
std::thread *threads_;
std::mutex mutex_;
std::condition_variable cv_;

public:
TaskSystemParallelThreadPoolSleeping(int num_threads);
~TaskSystemParallelThreadPoolSleeping();
const char *name();
void run(IRunnable *runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable *runnable, int num_total_tasks,
const std::vector<TaskID> &deps);
void sync();
};

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(
int num_threads)
: ITaskSystem(num_threads), next_task_id_(0), end_(false),
num_threads_(num_threads), threads_(new std::thread[num_threads]) {
for (int i = 0; i < num_threads_; ++i) {
threads_[i] = std::thread([&] {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
while (ready_tasks_.empty()) {
if (end_) {
return;
}
cv_.wait(lock);
}

auto it = std::find_if(
ready_tasks_.begin(), ready_tasks_.end(), [](const Task &task) {
return task.current_task < task.num_total_tasks;
});
if (it != ready_tasks_.end()) {
int task_id = it->current_task++;
lock.unlock();

it->runnable->runTask(task_id, it->num_total_tasks);

lock.lock();
if (++it->num_finished_tasks == it->num_total_tasks) {
for (TaskID successor : it->successors) {
if (--tasks_[successor].num_deps == 0) {
ready_tasks_.push_back(tasks_[successor]);
tasks_.erase(successor);
}
}
ready_tasks_.erase(it);
lock.unlock();
cv_.notify_all();
}
}
}
});
}
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
std::unique_lock<std::mutex> lock(mutex_);
end_ = true;
while (!ready_tasks_.empty()) {
cv_.wait(lock);
}
lock.unlock();
cv_.notify_all();
for (int i = 0; i < num_threads_; ++i) {
threads_[i].join();
}
delete[] threads_;
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable *runnable,
int num_total_tasks) {
sync();
runAsyncWithDeps(runnable, num_total_tasks, {});
sync();
}

TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(
IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) {
std::unique_lock<std::mutex> lock(mutex_);
int task_id = next_task_id_++;
Task task{runnable, num_total_tasks, 0, 0, {}, 0};
for (TaskID dep : deps) {
auto it = tasks_.find(dep);
if (it != tasks_.end()) {
it->second.successors.push_back(task_id);
task.num_deps++;
}
}
if (task.num_deps == 0) {
ready_tasks_.push_back(task);
} else {
tasks_.emplace(task_id, task);
}
lock.unlock();
cv_.notify_all();
return task_id;
}

void TaskSystemParallelThreadPoolSleeping::sync() {
std::unique_lock<std::mutex> lock(mutex_);
while (!ready_tasks_.empty()) {
cv_.wait(lock);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
~/codes/CS149/asst2/part_b (master*) » python ../tests/run_test_harness.py                         mizukicry@S-Terminal
================================================================================
Running task system grading harness... (11 total tests)
- Detected CPU with 16 execution contexts
- Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 6.834 7.3 0.94 (OK)
[Parallel + Always Spawn] 6.825 119.887 0.06 (OK)
[Parallel + Thread Pool + Spin] 6.814 23.872 0.29 (OK)
[Parallel + Thread Pool + Sleep] 57.034 55.198 1.03 (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
STUDENT REFERENCE PERF?
[Serial] 47.009 47.599 0.99 (OK)
[Parallel + Always Spawn] 46.992 110.904 0.42 (OK)
[Parallel + Thread Pool + Spin] 46.815 26.595 1.76 (NOT OK)
[Parallel + Thread Pool + Sleep] 65.454 55.06 1.19 (OK)
================================================================================
Executing test: ping_pong_equal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal
STUDENT REFERENCE PERF?
[Serial] 760.099 766.185 0.99 (OK)
[Parallel + Always Spawn] 758.077 234.108 3.24 (NOT OK)
[Parallel + Thread Pool + Spin] 762.266 185.371 4.11 (NOT OK)
[Parallel + Thread Pool + Sleep] 187.322 199.304 0.94 (OK)
================================================================================
Executing test: ping_pong_unequal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal
STUDENT REFERENCE PERF?
[Serial] 1310.054 1339.176 0.98 (OK)
[Parallel + Always Spawn] 1307.906 314.079 4.16 (NOT OK)
[Parallel + Thread Pool + Spin] 1310.758 257.618 5.09 (NOT OK)
[Parallel + Thread Pool + Sleep] 277.608 273.891 1.01 (OK)
================================================================================
Executing test: recursive_fibonacci...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci
STUDENT REFERENCE PERF?
[Serial] 714.303 1338.318 0.53 (OK)
[Parallel + Always Spawn] 713.52 214.992 3.32 (NOT OK)
[Parallel + Thread Pool + Spin] 716.245 213.09 3.36 (NOT OK)
[Parallel + Thread Pool + Sleep] 125.783 207.63 0.61 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop
STUDENT REFERENCE PERF?
[Serial] 500.2 509.677 0.98 (OK)
[Parallel + Always Spawn] 500.693 560.396 0.89 (OK)
[Parallel + Thread Pool + Spin] 500.159 146.873 3.41 (NOT OK)
[Parallel + Thread Pool + Sleep] 325.321 296.901 1.10 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fewer_tasks...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks
STUDENT REFERENCE PERF?
[Serial] 497.63 508.337 0.98 (OK)
[Parallel + Always Spawn] 498.427 561.564 0.89 (OK)
[Parallel + Thread Pool + Spin] 498.512 164.252 3.04 (NOT OK)
[Parallel + Thread Pool + Sleep] 322.417 306.755 1.05 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fan_in...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in
STUDENT REFERENCE PERF?
[Serial] 258.782 263.005 0.98 (OK)
[Parallel + Always Spawn] 258.566 97.331 2.66 (NOT OK)
[Parallel + Thread Pool + Spin] 258.325 61.301 4.21 (NOT OK)
[Parallel + Thread Pool + Sleep] 74.849 74.952 1.00 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree
STUDENT REFERENCE PERF?
[Serial] 256.317 261.131 0.98 (OK)
[Parallel + Always Spawn] 255.237 56.143 4.55 (NOT OK)
[Parallel + Thread Pool + Spin] 255.617 46.603 5.48 (NOT OK)
[Parallel + Thread Pool + Sleep] 51.275 48.664 1.05 (OK)
================================================================================
Executing test: spin_between_run_calls...
Reference binary: ./runtasks_ref_linux
Results for: spin_between_run_calls
STUDENT REFERENCE PERF?
[Serial] 251.748 473.086 0.53 (OK)
[Parallel + Always Spawn] 251.408 238.691 1.05 (OK)
[Parallel + Thread Pool + Spin] 251.87 283.328 0.89 (OK)
[Parallel + Thread Pool + Sleep] 157.518 238.803 0.66 (OK)
================================================================================
Executing test: mandelbrot_chunked...
Reference binary: ./runtasks_ref_linux
Results for: mandelbrot_chunked
STUDENT REFERENCE PERF?
[Serial] 332.023 322.752 1.03 (OK)
[Parallel + Always Spawn] 332.18 44.198 7.52 (NOT OK)
[Parallel + Thread Pool + Spin] 332.052 45.215 7.34 (NOT OK)
[Parallel + Thread Pool + Sleep] 43.217 43.614 0.99 (OK)
================================================================================
Overall performance results
[Serial] : All passed Perf
[Parallel + Always Spawn] : Perf did not pass all tests
[Parallel + Thread Pool + Spin] : Perf did not pass all tests
[Parallel + Thread Pool + Sleep] : All passed Perf