Project 目标:实现一个线程安全的 buffer
pool,用于在硬盘和内存之间移动数据
Task 分为三部分:
- Extendible Hash Table
- LRU-K Replacement Policy
- Buffer Pool Manager Instance
可以使用 C++17 的一些容器辅助(但注意 STL 容器不是线程安全的)
Task #1 - Extendible Hash
Table
实现一个可以自动扩张的线程安全 Hash Table
这里的 Extendible Hashing
指的是课件中的算法,不是单纯的可扩张,一开始看代码还没看懂
感觉课件上不是特别清楚,附一篇知乎文章
话说阅读框架好痛苦,经常对不上电波(
首先是 Bucket
的实现,没什么难点
因为 Bucket
只供 ExtendibleHashTable
调用,不需要考虑多线程问题
clang-tidy
经常提示用 STL
函数替代循环,然而真用了也没感觉有多简洁(?)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| template <typename K, typename V> auto ExtendibleHashTable<K, V>::Bucket::Find(const K &key, V &value) -> bool { for (const auto &[k, v] : list_) { if (k == key) { value = v; return true; } } return false; }
template <typename K, typename V> auto ExtendibleHashTable<K, V>::Bucket::Remove(const K &key) -> bool { auto pos = std::find_if(list_.cbegin(), list_.cend(), [&key](const auto &pair) { return pair.first == key; }); if (pos != list_.cend()) { list_.erase(pos); return true; } return false; }
template <typename K, typename V> auto ExtendibleHashTable<K, V>::Bucket::Insert(const K &key, const V &value) -> bool { auto pos = std::find_if(list_.begin(), list_.end(), [&key](const auto &pair) { return pair.first == key; }); if (pos != list_.end()) { pos->second = value; return true; } if (IsFull()) { return false; } list_.emplace_back(key, value); return true; }
|
然后是 ExtendibleHashTable
的实现,对于
Find
和 Remove
直接调用 Bucket
的就行,注意上锁。
比较麻烦的是 Insert
,模拟 Bucket
分裂的时候需要注意一下细节,因为可能一次分裂不完,需要用递归或者循环,如果使用递归则需要用
std::recursive_mutex
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| template <typename K, typename V> auto ExtendibleHashTable<K, V>::Find(const K &key, V &value) -> bool { std::scoped_lock<std::mutex> lock(latch_); return dir_[IndexOf(key)]->Find(key, value); }
template <typename K, typename V> auto ExtendibleHashTable<K, V>::Remove(const K &key) -> bool { std::scoped_lock<std::mutex> lock(latch_); return dir_[IndexOf(key)]->Remove(key); }
template <typename K, typename V> void ExtendibleHashTable<K, V>::Insert(const K &key, const V &value) { std::scoped_lock<std::mutex> lock(latch_); while (true) { const size_t index = IndexOf(key); if (dir_[index]->Insert(key, value)) { return; } const int old_depth = dir_[index]->GetDepth(); if (old_depth == global_depth_) { std::vector<std::shared_ptr<Bucket>> new_dir; ++global_depth_; new_dir.reserve(1 << global_depth_); for (size_t i = 0; i < (1ULL << global_depth_); ++i) { new_dir.push_back(dir_[i & ((1ULL << (global_depth_ - 1)) - 1)]); } dir_.swap(new_dir); } std::shared_ptr<Bucket> old_bucket = dir_[index]; auto bucket0 = std::make_shared<Bucket>(bucket_size_, old_depth + 1); auto bucket1 = std::make_shared<Bucket>(bucket_size_, old_depth + 1); const size_t mask = index & ((1ULL << old_depth) - 1); for (size_t i = 0; i < (1ULL << (global_depth_ - old_depth)); ++i) { dir_[i << old_depth | mask] = (i & 1) == 1 ? bucket1 : bucket0; } RedistributeBucket(old_bucket); ++num_buckets_; } }
template <typename K, typename V> auto ExtendibleHashTable<K, V>::RedistributeBucket(std::shared_ptr<Bucket> bucket) -> void { for (auto &[key, value] : bucket->GetItems()) { const size_t index = IndexOf(key); dir_[index]->Insert(key, value); } }
|
Task #2 - LRU-K Replacement
Policy
实现 LRU 算法的变体:LRU-K
和之前的任务不一样,这个任务自由度更高一些,需要自己加东西
关于怎么维护 frame,我直接用的数组 +
暴力扫描,想要优化的话应该可以用堆优化一下,或者不知道能不能像 LRU 那样
Hash + 链表做到 O(1)?
至于什么算法最好还得看实际数据情况,因为我也看不到数据所以就不实现其他算法了
首先是实现了一个 Frame
类用来辅助,里面就是一个
std::queue
用来存放 k 个时间戳
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class LRUKReplacer { private: class Frame { public: const size_t k_; bool valid_{false}; bool evictable_{false}; std::queue<size_t> timestamps_;
explicit Frame(const size_t k) : k_(k) {} ~Frame() = default;
void Access(const size_t timestamp) { timestamps_.push(timestamp); if (timestamps_.size() > k_) { timestamps_.pop(); } }
void Reset() { valid_ = false; evictable_ = false; std::queue<size_t>().swap(timestamps_); }
auto Timestamp() const -> size_t { return timestamps_.front(); } auto Full() const -> bool { return timestamps_.size() == k_; } };
std::vector<Frame> frames_; };
|
然后任务要实现的函数都直接模拟就行
吐槽一下这个 frame_id
是从 0 到
replacer_size_
- 1,我一直以为从 1 开始,浪费半天时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| LRUKReplacer::LRUKReplacer(size_t num_frames, size_t k) : replacer_size_(num_frames), k_(k), frames_(num_frames + 1, Frame(k_)) {}
auto LRUKReplacer::Evict(frame_id_t *frame_id) -> bool { *frame_id = -1;
static auto frame_cmp = [](const Frame &a, const Frame &b) -> bool { return a.Full() == b.Full() ? a.Timestamp() < b.Timestamp() : !a.Full(); };
std::scoped_lock<std::mutex> lock(latch_);
for (size_t i = 0; i < replacer_size_; ++i) { auto &frame = frames_[i]; if (!frame.evictable_) { continue; } if (*frame_id == -1 || frame_cmp(frames_[i], frames_[*frame_id])) { *frame_id = i; } } if (*frame_id == -1) { return false; } frames_[*frame_id].Reset(); --curr_size_; return true; }
void LRUKReplacer::RecordAccess(frame_id_t frame_id) { BUSTUB_ASSERT(frame_id < static_cast<int>(replacer_size_), "frame id is invalid."); std::scoped_lock<std::mutex> lock(latch_); auto &frame = frames_[frame_id]; if (!frame.valid_) { frame.valid_ = true; frame.evictable_ = true; ++curr_size_; } frame.Access(current_timestamp_++); }
void LRUKReplacer::SetEvictable(frame_id_t frame_id, bool set_evictable) { BUSTUB_ASSERT(frame_id < static_cast<int>(replacer_size_), "frame id is invalid."); std::scoped_lock<std::mutex> lock(latch_); auto &frame = frames_[frame_id]; if (!frame.valid_) { return; } if (frame.evictable_ != set_evictable) { frame.evictable_ = set_evictable; curr_size_ += set_evictable ? 1 : -1; } }
void LRUKReplacer::Remove(frame_id_t frame_id) { if (frame_id >= static_cast<int>(replacer_size_)) { return; } std::scoped_lock<std::mutex> lock(latch_); auto &frame = frames_[frame_id]; if (!frame.valid_) { return; } BUSTUB_ASSERT(frame.evictable_, "non-evictable frame is removed."); frame.Reset(); --curr_size_; }
auto LRUKReplacer::Size() -> size_t { std::scoped_lock<std::mutex> lock(latch_); return curr_size_; }
|
Task #3 - Buffer Pool
Manager Instance
利用之前写的 Hash Table 和 LRU-K 完成 Buffer Pool
几个坑点:
FetchPgImp
中若 page_table_
中已经存在则要将 pin_count_
+ 1 并记录访问
UnpinPgImp
中设置 is_dirty_
时只能将
false
设置为 true
,不能改变已经为
true
的值
FlushAllPgsImp
中同样需要忽略
is_dirty_
,可以通过判断 page_id_
是否为
INVALID_PAGE_ID
决定是否写入
其实理论上应该在析构的时候再调用一次
FlushAllPgsImp
?不过试了试过不了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| auto BufferPoolManagerInstance::NewPgImp(page_id_t *page_id) -> Page * { std::scoped_lock<std::mutex> lock(latch_); frame_id_t frame_id; if (!free_list_.empty()) { frame_id = free_list_.front(); free_list_.pop_front(); } else if (replacer_->Evict(&frame_id)) { auto &page = pages_[frame_id]; page_table_->Remove(page.GetPageId()); if (page.IsDirty()) { disk_manager_->WritePage(page.GetPageId(), page.GetData()); } } else { return nullptr; } auto &page = pages_[frame_id]; *page_id = AllocatePage(); page.ResetMemory(); page.page_id_ = *page_id; page.is_dirty_ = false; page.pin_count_ = 1; page_table_->Insert(*page_id, frame_id); replacer_->RecordAccess(frame_id); replacer_->SetEvictable(frame_id, false); return &page; }
auto BufferPoolManagerInstance::FetchPgImp(page_id_t page_id) -> Page * { std::scoped_lock<std::mutex> lock(latch_); frame_id_t frame_id; if (page_table_->Find(page_id, frame_id)) { auto &page = pages_[frame_id]; ++page.pin_count_; replacer_->RecordAccess(frame_id); replacer_->SetEvictable(frame_id, false); return &page; } if (!free_list_.empty()) { frame_id = free_list_.front(); free_list_.pop_front(); } else if (replacer_->Evict(&frame_id)) { auto &page = pages_[frame_id]; if (page.IsDirty()) { disk_manager_->WritePage(page.GetPageId(), page.GetData()); } page_table_->Remove(page.GetPageId()); } else { return nullptr; } auto &page = pages_[frame_id]; page.ResetMemory(); page.page_id_ = page_id; page.is_dirty_ = false; page.pin_count_ = 1; disk_manager_->ReadPage(page_id, page.GetData()); page_table_->Insert(page_id, frame_id); replacer_->RecordAccess(frame_id); replacer_->SetEvictable(frame_id, false); return &page; }
auto BufferPoolManagerInstance::UnpinPgImp(page_id_t page_id, bool is_dirty) -> bool { std::scoped_lock<std::mutex> lock(latch_); frame_id_t frame_id; if (!page_table_->Find(page_id, frame_id)) { return false; } auto &page = pages_[frame_id]; if (page.pin_count_ == 0) { return false; } if (--page.pin_count_ == 0) { replacer_->SetEvictable(frame_id, true); } page.is_dirty_ |= is_dirty; return true; }
auto BufferPoolManagerInstance::FlushPgImp(page_id_t page_id) -> bool { std::scoped_lock<std::mutex> lock(latch_); frame_id_t frame_id; if (!page_table_->Find(page_id, frame_id)) { return false; } disk_manager_->WritePage(page_id, pages_[frame_id].GetData()); pages_[frame_id].is_dirty_ = false; return true; }
void BufferPoolManagerInstance::FlushAllPgsImp() { std::scoped_lock<std::mutex> lock(latch_); for (Page *page = pages_; page < pages_ + pool_size_; ++page) { if (page->GetPageId() != INVALID_PAGE_ID) { disk_manager_->WritePage(page->GetPageId(), page->GetData()); page->is_dirty_ = false; } } }
auto BufferPoolManagerInstance::DeletePgImp(page_id_t page_id) -> bool { std::scoped_lock<std::mutex> lock(latch_); frame_id_t frame_id; if (!page_table_->Find(page_id, frame_id)) { return true; } auto &page = pages_[frame_id]; if (page.GetPinCount() > 0) { return false; } page_table_->Remove(page_id); replacer_->Remove(frame_id); free_list_.push_back(frame_id); page.ResetMemory(); page.is_dirty_ = false; page.page_id_ = INVALID_PAGE_ID; DeallocatePage(page_id); return true; }
|
毕竟写的都是暴力,测出来时间接近 4s,排在前面的一般都是 2s
左右,其实差距也不大