Stanford CS149 (Fall 2023) Parallel Computing
Programming Assignment 2: Scheduling Task Graphs on a Multi-Core
CPU
Github repo
Part A: Synchronous Bulk
Task Launch
创建三种不同的 Task execution engine,难点在于看懂要求
Step 1: Move to a
Parallel Task System
普通多线程版本,感觉动态分配任务的话有点浪费资源,不过静态分配如果任务运行时间不同就有可能效果不好
因为多线程之间有竞争,所以需要加锁或者原子操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class TaskSystemParallelSpawn : public ITaskSystem {private : int num_threads_; std::thread *threads_; public : TaskSystemParallelSpawn (int num_threads); ~TaskSystemParallelSpawn (); const char *name () ; void run (IRunnable *runnable, int num_total_tasks) ; TaskID runAsyncWithDeps (IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) ; void sync () ; }; TaskSystemParallelSpawn::TaskSystemParallelSpawn (int num_threads) : ITaskSystem (num_threads), num_threads (num_threads), threads (new std::thread[num_threads]) {} TaskSystemParallelSpawn::~TaskSystemParallelSpawn () { delete [] threads; } void TaskSystemParallelSpawn::run (IRunnable *runnable, int num_total_tasks) { std::atomic_int task_id (0 ) ; for (int i = 0 ; i < num_threads; ++i) { threads[i] = std::thread ([&] { while (true ) { int id = task_id.fetch_add (1 ); if (id >= num_total_tasks) { break ; } runnable->runTask (id, num_total_tasks); } }); } for (int i = 0 ; i < num_threads; ++i) { threads[i].join (); } }
Step
2: Avoid Frequent Thread Creation Using a Thread Pool
线程池版本,后台创建 num_threads
个线程,然后等待任务。每次获取任务时我直接加了一把大锁,感觉只用原子不好做
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 class TaskSystemParallelThreadPoolSpinning : public ITaskSystem {private : int num_threads_; std::thread *threads_; bool end_; IRunnable *runnable_; int num_total_tasks_; int current_task_id_; std::mutex mutex_; std::atomic_int num_finished_task_; public : TaskSystemParallelThreadPoolSpinning (int num_threads); ~TaskSystemParallelThreadPoolSpinning (); const char *name () ; void run (IRunnable *runnable, int num_total_tasks) ; TaskID runAsyncWithDeps (IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) ; void sync () ; }; TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning ( int num_threads) : ITaskSystem (num_threads), num_threads_ (num_threads), threads_ (new std::thread[num_threads]), end_ (false ), runnable_ (nullptr ) { for (int i = 0 ; i < num_threads; ++i) { threads_[i] = std::thread ([&] { while (true ) { IRunnable *runnable = nullptr ; int task_id; mutex_.lock (); if (runnable_) { runnable = runnable_; task_id = current_task_id_++; if (current_task_id_ >= num_total_tasks_) { runnable_ = nullptr ; } } mutex_.unlock (); if (runnable) { runnable->runTask (task_id, num_total_tasks_); num_finished_task_.fetch_add (1 ); } else if (end_) { break ; } } }); } } TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning () { end_ = true ; for (int i = 0 ; i < num_threads_; ++i) { threads_[i].join (); } delete [] threads_; } void TaskSystemParallelThreadPoolSpinning::run (IRunnable *runnable, int num_total_tasks) { mutex_.lock (); runnable_ = runnable; num_total_tasks_ = num_total_tasks; current_task_id_ = 0 ; num_finished_task_ = 0 ; mutex_.unlock (); while (num_finished_task_ < num_total_tasks) { } }
Step 3:
Put Threads to Sleep When There is Nothing to Do
在上面的基础上再利用条件变量实现睡眠
这是我第一次真正用条件变量,只能说这种东西写少了属实折磨
另外我在主线程中用的是 spin,然后被编译器优化导致死循环,最后加
volatile
修饰解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class TaskSystemParallelThreadPoolSleeping : public ITaskSystem {private : int num_threads_; std::thread *threads_; bool end_; IRunnable *volatile runnable_; int num_total_tasks_; int current_task_id_; std::mutex mutex_; volatile int num_sleeping_threads_; std::condition_variable cv_; public : TaskSystemParallelThreadPoolSleeping (int num_threads); ~TaskSystemParallelThreadPoolSleeping (); const char *name () ; void run (IRunnable *runnable, int num_total_tasks) ; TaskID runAsyncWithDeps (IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) ; void sync () ; }; TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping ( int num_threads) : ITaskSystem (num_threads), num_threads_ (num_threads), threads_ (new std::thread[num_threads]), end_ (false ), runnable_ (nullptr ), num_sleeping_threads_ (0 ) { for (int i = 0 ; i < num_threads; ++i) { threads_[i] = std::thread ([&] { while (true ) { if (end_) { break ; } std::unique_lock<std::mutex> lock (mutex_); if (IRunnable *runnable = runnable_) { int task_id = current_task_id_++; if (current_task_id_ >= num_total_tasks_) { runnable_ = nullptr ; } lock.unlock (); runnable->runTask (task_id, num_total_tasks_); } else { num_sleeping_threads_++; cv_.wait (lock); num_sleeping_threads_--; } } }); } } TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping () { end_ = true ; while (num_sleeping_threads_ > 0 ) { cv_.notify_all (); } for (int i = 0 ; i < num_threads_; ++i) { threads_[i].join (); } delete [] threads_; } void TaskSystemParallelThreadPoolSleeping::run (IRunnable *runnable, int num_total_tasks) { std::unique_lock<std::mutex> lock (mutex_) ; runnable_ = runnable; num_total_tasks_ = num_total_tasks; current_task_id_ = 0 ; lock.unlock (); cv_.notify_all (); while (runnable_ || num_sleeping_threads_ < num_threads_) { } }
最终性能测试结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 ~/codes/CS149/asst2/part_a (master*) » python ../tests/run_test_harness.py mizukicry@S-Terminal ================================================================================ Running task system grading harness... (11 total tests) - Detected CPU with 16 execution contexts - Task system configured to use at most 8 threads ================================================================================ ================================================================================ Executing test: super_super_light... Reference binary: ./runtasks_ref_linux Results for: super_super_light STUDENT REFERENCE PERF? [Serial] 6.839 7.323 0.93 (OK) [Parallel + Always Spawn] 107.754 111.695 0.96 (OK) [Parallel + Thread Pool + Spin] 7.095 16.314 0.43 (OK) [Parallel + Thread Pool + Sleep] 57.004 56.313 1.01 (OK) ================================================================================ Executing test: super_light... Reference binary: ./runtasks_ref_linux Results for: super_light STUDENT REFERENCE PERF? [Serial] 46.945 47.279 0.99 (OK) [Parallel + Always Spawn] 108.165 109.2 0.99 (OK) [Parallel + Thread Pool + Spin] 11.698 23.173 0.50 (OK) [Parallel + Thread Pool + Sleep] 53.053 53.36 0.99 (OK) ================================================================================ Executing test: ping_pong_equal... Reference binary: ./runtasks_ref_linux Results for: ping_pong_equal STUDENT REFERENCE PERF? [Serial] 756.903 759.672 1.00 (OK) [Parallel + Always Spawn] 198.972 210.784 0.94 (OK) [Parallel + Thread Pool + Spin] 136.811 163.145 0.84 (OK) [Parallel + Thread Pool + Sleep] 154.063 176.665 0.87 (OK) ================================================================================ Executing test: ping_pong_unequal... Reference binary: ./runtasks_ref_linux Results for: ping_pong_unequal STUDENT REFERENCE PERF? [Serial] 1297.036 1328.119 0.98 (OK) [Parallel + Always Spawn] 282.762 286.793 0.99 (OK) [Parallel + Thread Pool + Spin] 223.534 235.168 0.95 (OK) [Parallel + Thread Pool + Sleep] 238.699 252.337 0.95 (OK) ================================================================================ Executing test: recursive_fibonacci... Reference binary: ./runtasks_ref_linux Results for: recursive_fibonacci STUDENT REFERENCE PERF? [Serial] 703.682 1324.785 0.53 (OK) [Parallel + Always Spawn] 116.104 196.208 0.59 (OK) [Parallel + Thread Pool + Spin] 113.556 202.847 0.56 (OK) [Parallel + Thread Pool + Sleep] 115.634 192.843 0.60 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop STUDENT REFERENCE PERF? [Serial] 494.335 506.367 0.98 (OK) [Parallel + Always Spawn] 552.84 546.948 1.01 (OK) [Parallel + Thread Pool + Spin] 110.831 129.962 0.85 (OK) [Parallel + Thread Pool + Sleep] 282.095 288.475 0.98 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_fewer_tasks... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_fewer_tasks STUDENT REFERENCE PERF? [Serial] 495.172 505.118 0.98 (OK) [Parallel + Always Spawn] 538.859 541.259 1.00 (OK) [Parallel + Thread Pool + Spin] 127.93 145.016 0.88 (OK) [Parallel + Thread Pool + Sleep] 280.496 290.938 0.96 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_fan_in... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_fan_in STUDENT REFERENCE PERF? [Serial] 257.69 262.913 0.98 (OK) [Parallel + Always Spawn] 89.402 90.371 0.99 (OK) [Parallel + Thread Pool + Spin] 46.508 51.993 0.89 (OK) [Parallel + Thread Pool + Sleep] 60.783 68.523 0.89 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_reduction_tree... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_reduction_tree STUDENT REFERENCE PERF? [Serial] 253.9 258.621 0.98 (OK) [Parallel + Always Spawn] 52.508 52.753 1.00 (OK) [Parallel + Thread Pool + Spin] 41.257 42.298 0.98 (OK) [Parallel + Thread Pool + Sleep] 45.557 46.396 0.98 (OK) ================================================================================ Executing test: spin_between_run_calls... Reference binary: ./runtasks_ref_linux Results for: spin_between_run_calls STUDENT REFERENCE PERF? [Serial] 249.726 470.106 0.53 (OK) [Parallel + Always Spawn] 126.316 236.662 0.53 (OK) [Parallel + Thread Pool + Spin] 147.054 268.436 0.55 (OK) [Parallel + Thread Pool + Sleep] 126.533 235.999 0.54 (OK) ================================================================================ Executing test: mandelbrot_chunked... Reference binary: ./runtasks_ref_linux Results for: mandelbrot_chunked STUDENT REFERENCE PERF? [Serial] 331.607 322.024 1.03 (OK) [Parallel + Always Spawn] 42.408 42.856 0.99 (OK) [Parallel + Thread Pool + Spin] 42.695 42.811 1.00 (OK) [Parallel + Thread Pool + Sleep] 42.959 43.082 1.00 (OK) ================================================================================ Overall performance results [Serial] : All passed Perf [Parallel + Always Spawn] : All passed Perf [Parallel + Thread Pool + Spin] : All passed Perf [Parallel + Thread Pool + Sleep] : All passed Perf
Part B: Supporting
Execution of Task Graphs
在 Part A Sleep 版本的基础上实现 Task Graph
由于给出的依赖关系是一个 DAG,也就是实现一个拓扑排序
没什么好说的,多线程这东西就是看起来简单写起来要命,只能多练
顺便把主线程也改成了
Sleep,细分锁的粒度或者换原子变量之类的性能还可以继续优化,不过过了就暂时这样吧,清明假期摸会鱼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 class TaskSystemParallelThreadPoolSleeping : public ITaskSystem {private : struct Task { IRunnable *runnable; int num_total_tasks; int current_task; int num_deps; std::vector<TaskID> successors; int num_finished_tasks; }; int next_task_id_; std::unordered_map<TaskID, Task> tasks_; std::list<Task> ready_tasks_; bool end_; int num_threads_; std::thread *threads_; std::mutex mutex_; std::condition_variable cv_; public : TaskSystemParallelThreadPoolSleeping (int num_threads); ~TaskSystemParallelThreadPoolSleeping (); const char *name () ; void run (IRunnable *runnable, int num_total_tasks) ; TaskID runAsyncWithDeps (IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) ; void sync () ; }; TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping ( int num_threads) : ITaskSystem (num_threads), next_task_id_ (0 ), end_ (false ), num_threads_ (num_threads), threads_ (new std::thread[num_threads]) { for (int i = 0 ; i < num_threads_; ++i) { threads_[i] = std::thread ([&] { while (true ) { std::unique_lock<std::mutex> lock (mutex_); while (ready_tasks_.empty ()) { if (end_) { return ; } cv_.wait (lock); } auto it = std::find_if ( ready_tasks_.begin (), ready_tasks_.end (), [](const Task &task) { return task.current_task < task.num_total_tasks; }); if (it != ready_tasks_.end ()) { int task_id = it->current_task++; lock.unlock (); it->runnable->runTask (task_id, it->num_total_tasks); lock.lock (); if (++it->num_finished_tasks == it->num_total_tasks) { for (TaskID successor : it->successors) { if (--tasks_[successor].num_deps == 0 ) { ready_tasks_.push_back (tasks_[successor]); tasks_.erase (successor); } } ready_tasks_.erase (it); lock.unlock (); cv_.notify_all (); } } } }); } } TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping () { std::unique_lock<std::mutex> lock (mutex_) ; end_ = true ; while (!ready_tasks_.empty ()) { cv_.wait (lock); } lock.unlock (); cv_.notify_all (); for (int i = 0 ; i < num_threads_; ++i) { threads_[i].join (); } delete [] threads_; } void TaskSystemParallelThreadPoolSleeping::run (IRunnable *runnable, int num_total_tasks) { sync (); runAsyncWithDeps (runnable, num_total_tasks, {}); sync (); } TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps ( IRunnable *runnable, int num_total_tasks, const std::vector<TaskID> &deps) { std::unique_lock<std::mutex> lock (mutex_) ; int task_id = next_task_id_++; Task task{runnable, num_total_tasks, 0 , 0 , {}, 0 }; for (TaskID dep : deps) { auto it = tasks_.find (dep); if (it != tasks_.end ()) { it->second.successors.push_back (task_id); task.num_deps++; } } if (task.num_deps == 0 ) { ready_tasks_.push_back (task); } else { tasks_.emplace (task_id, task); } lock.unlock (); cv_.notify_all (); return task_id; } void TaskSystemParallelThreadPoolSleeping::sync () { std::unique_lock<std::mutex> lock (mutex_) ; while (!ready_tasks_.empty ()) { cv_.wait (lock); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 ~/codes/CS149/asst2/part_b (master*) » python ../tests/run_test_harness.py mizukicry@S-Terminal ================================================================================ Running task system grading harness... (11 total tests) - Detected CPU with 16 execution contexts - Task system configured to use at most 8 threads ================================================================================ ================================================================================ Executing test: super_super_light... Reference binary: ./runtasks_ref_linux Results for: super_super_light STUDENT REFERENCE PERF? [Serial] 6.834 7.3 0.94 (OK) [Parallel + Always Spawn] 6.825 119.887 0.06 (OK) [Parallel + Thread Pool + Spin] 6.814 23.872 0.29 (OK) [Parallel + Thread Pool + Sleep] 57.034 55.198 1.03 (OK) ================================================================================ Executing test: super_light... Reference binary: ./runtasks_ref_linux Results for: super_light STUDENT REFERENCE PERF? [Serial] 47.009 47.599 0.99 (OK) [Parallel + Always Spawn] 46.992 110.904 0.42 (OK) [Parallel + Thread Pool + Spin] 46.815 26.595 1.76 (NOT OK) [Parallel + Thread Pool + Sleep] 65.454 55.06 1.19 (OK) ================================================================================ Executing test: ping_pong_equal... Reference binary: ./runtasks_ref_linux Results for: ping_pong_equal STUDENT REFERENCE PERF? [Serial] 760.099 766.185 0.99 (OK) [Parallel + Always Spawn] 758.077 234.108 3.24 (NOT OK) [Parallel + Thread Pool + Spin] 762.266 185.371 4.11 (NOT OK) [Parallel + Thread Pool + Sleep] 187.322 199.304 0.94 (OK) ================================================================================ Executing test: ping_pong_unequal... Reference binary: ./runtasks_ref_linux Results for: ping_pong_unequal STUDENT REFERENCE PERF? [Serial] 1310.054 1339.176 0.98 (OK) [Parallel + Always Spawn] 1307.906 314.079 4.16 (NOT OK) [Parallel + Thread Pool + Spin] 1310.758 257.618 5.09 (NOT OK) [Parallel + Thread Pool + Sleep] 277.608 273.891 1.01 (OK) ================================================================================ Executing test: recursive_fibonacci... Reference binary: ./runtasks_ref_linux Results for: recursive_fibonacci STUDENT REFERENCE PERF? [Serial] 714.303 1338.318 0.53 (OK) [Parallel + Always Spawn] 713.52 214.992 3.32 (NOT OK) [Parallel + Thread Pool + Spin] 716.245 213.09 3.36 (NOT OK) [Parallel + Thread Pool + Sleep] 125.783 207.63 0.61 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop STUDENT REFERENCE PERF? [Serial] 500.2 509.677 0.98 (OK) [Parallel + Always Spawn] 500.693 560.396 0.89 (OK) [Parallel + Thread Pool + Spin] 500.159 146.873 3.41 (NOT OK) [Parallel + Thread Pool + Sleep] 325.321 296.901 1.10 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_fewer_tasks... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_fewer_tasks STUDENT REFERENCE PERF? [Serial] 497.63 508.337 0.98 (OK) [Parallel + Always Spawn] 498.427 561.564 0.89 (OK) [Parallel + Thread Pool + Spin] 498.512 164.252 3.04 (NOT OK) [Parallel + Thread Pool + Sleep] 322.417 306.755 1.05 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_fan_in... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_fan_in STUDENT REFERENCE PERF? [Serial] 258.782 263.005 0.98 (OK) [Parallel + Always Spawn] 258.566 97.331 2.66 (NOT OK) [Parallel + Thread Pool + Spin] 258.325 61.301 4.21 (NOT OK) [Parallel + Thread Pool + Sleep] 74.849 74.952 1.00 (OK) ================================================================================ Executing test: math_operations_in_tight_for_loop_reduction_tree... Reference binary: ./runtasks_ref_linux Results for: math_operations_in_tight_for_loop_reduction_tree STUDENT REFERENCE PERF? [Serial] 256.317 261.131 0.98 (OK) [Parallel + Always Spawn] 255.237 56.143 4.55 (NOT OK) [Parallel + Thread Pool + Spin] 255.617 46.603 5.48 (NOT OK) [Parallel + Thread Pool + Sleep] 51.275 48.664 1.05 (OK) ================================================================================ Executing test: spin_between_run_calls... Reference binary: ./runtasks_ref_linux Results for: spin_between_run_calls STUDENT REFERENCE PERF? [Serial] 251.748 473.086 0.53 (OK) [Parallel + Always Spawn] 251.408 238.691 1.05 (OK) [Parallel + Thread Pool + Spin] 251.87 283.328 0.89 (OK) [Parallel + Thread Pool + Sleep] 157.518 238.803 0.66 (OK) ================================================================================ Executing test: mandelbrot_chunked... Reference binary: ./runtasks_ref_linux Results for: mandelbrot_chunked STUDENT REFERENCE PERF? [Serial] 332.023 322.752 1.03 (OK) [Parallel + Always Spawn] 332.18 44.198 7.52 (NOT OK) [Parallel + Thread Pool + Spin] 332.052 45.215 7.34 (NOT OK) [Parallel + Thread Pool + Sleep] 43.217 43.614 0.99 (OK) ================================================================================ Overall performance results [Serial] : All passed Perf [Parallel + Always Spawn] : Perf did not pass all tests [Parallel + Thread Pool + Spin] : Perf did not pass all tests [Parallel + Thread Pool + Sleep] : All passed Perf