Project #1 - Buffer Pool

Project 目标:实现一个线程安全的 buffer pool,用于在硬盘和内存之间移动数据

Task 分为三部分:

  • Extendible Hash Table
  • LRU-K Replacement Policy
  • Buffer Pool Manager Instance

可以使用 C++17 的一些容器辅助(但注意 STL 容器不是线程安全的)


Task #1 - Extendible Hash Table

实现一个可以自动扩张的线程安全 Hash Table

这里的 Extendible Hashing 指的是课件中的算法,不是单纯的可扩张,一开始看代码还没看懂

感觉课件上不是特别清楚,附一篇知乎文章

话说阅读框架好痛苦,经常对不上电波(

首先是 Bucket 的实现,没什么难点

因为 Bucket 只供 ExtendibleHashTable 调用,不需要考虑多线程问题

clang-tidy 经常提示用 STL 函数替代循环,然而真用了也没感觉有多简洁(?)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Find(const K &key, V &value) -> bool {
for (const auto &[k, v] : list_) {
if (k == key) {
value = v;
return true;
}
}
return false;
}

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Remove(const K &key) -> bool {
auto pos = std::find_if(list_.cbegin(), list_.cend(), [&key](const auto &pair) { return pair.first == key; });
if (pos != list_.cend()) {
list_.erase(pos);
return true;
}
return false;
}

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Bucket::Insert(const K &key, const V &value) -> bool {
auto pos = std::find_if(list_.begin(), list_.end(), [&key](const auto &pair) { return pair.first == key; });
if (pos != list_.end()) {
pos->second = value;
return true;
}
if (IsFull()) {
return false;
}
list_.emplace_back(key, value);
return true;
}

然后是 ExtendibleHashTable 的实现,对于 FindRemove 直接调用 Bucket 的就行,注意上锁。

比较麻烦的是 Insert,模拟 Bucket 分裂的时候需要注意一下细节,因为可能一次分裂不完,需要用递归或者循环,如果使用递归则需要用 std::recursive_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Find(const K &key, V &value) -> bool {
std::scoped_lock<std::mutex> lock(latch_);
return dir_[IndexOf(key)]->Find(key, value);
}

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::Remove(const K &key) -> bool {
std::scoped_lock<std::mutex> lock(latch_);
return dir_[IndexOf(key)]->Remove(key);
}

template <typename K, typename V>
void ExtendibleHashTable<K, V>::Insert(const K &key, const V &value) {
std::scoped_lock<std::mutex> lock(latch_);
while (true) {
const size_t index = IndexOf(key);
if (dir_[index]->Insert(key, value)) {
return;
}
const int old_depth = dir_[index]->GetDepth();
if (old_depth == global_depth_) {
std::vector<std::shared_ptr<Bucket>> new_dir;
++global_depth_;
new_dir.reserve(1 << global_depth_);
for (size_t i = 0; i < (1ULL << global_depth_); ++i) {
new_dir.push_back(dir_[i & ((1ULL << (global_depth_ - 1)) - 1)]);
}
dir_.swap(new_dir);
}
std::shared_ptr<Bucket> old_bucket = dir_[index];
auto bucket0 = std::make_shared<Bucket>(bucket_size_, old_depth + 1);
auto bucket1 = std::make_shared<Bucket>(bucket_size_, old_depth + 1);
const size_t mask = index & ((1ULL << old_depth) - 1);
for (size_t i = 0; i < (1ULL << (global_depth_ - old_depth)); ++i) {
dir_[i << old_depth | mask] = (i & 1) == 1 ? bucket1 : bucket0;
}
RedistributeBucket(old_bucket);
++num_buckets_;
}
}

template <typename K, typename V>
auto ExtendibleHashTable<K, V>::RedistributeBucket(std::shared_ptr<Bucket> bucket) -> void {
for (auto &[key, value] : bucket->GetItems()) {
const size_t index = IndexOf(key);
dir_[index]->Insert(key, value);
}
}

Task #2 - LRU-K Replacement Policy

实现 LRU 算法的变体:LRU-K

和之前的任务不一样,这个任务自由度更高一些,需要自己加东西

关于怎么维护 frame,我直接用的数组 + 暴力扫描,想要优化的话应该可以用堆优化一下,或者不知道能不能像 LRU 那样 Hash + 链表做到 O(1)?

至于什么算法最好还得看实际数据情况,因为我也看不到数据所以就不实现其他算法了

首先是实现了一个 Frame 类用来辅助,里面就是一个 std::queue 用来存放 k 个时间戳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class LRUKReplacer {
private:
class Frame {
public:
const size_t k_;
bool valid_{false};
bool evictable_{false};
std::queue<size_t> timestamps_;

explicit Frame(const size_t k) : k_(k) {}
~Frame() = default;

void Access(const size_t timestamp) {
timestamps_.push(timestamp);
if (timestamps_.size() > k_) {
timestamps_.pop();
}
}

void Reset() {
valid_ = false;
evictable_ = false;
std::queue<size_t>().swap(timestamps_);
}

auto Timestamp() const -> size_t { return timestamps_.front(); }
auto Full() const -> bool { return timestamps_.size() == k_; }
};

std::vector<Frame> frames_;
};

然后任务要实现的函数都直接模拟就行

吐槽一下这个 frame_id 是从 0 到 replacer_size_ - 1,我一直以为从 1 开始,浪费半天时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
LRUKReplacer::LRUKReplacer(size_t num_frames, size_t k)
: replacer_size_(num_frames), k_(k), frames_(num_frames + 1, Frame(k_)) {}

auto LRUKReplacer::Evict(frame_id_t *frame_id) -> bool {
*frame_id = -1;

static auto frame_cmp = [](const Frame &a, const Frame &b) -> bool {
return a.Full() == b.Full() ? a.Timestamp() < b.Timestamp() : !a.Full();
};

std::scoped_lock<std::mutex> lock(latch_);

for (size_t i = 0; i < replacer_size_; ++i) {
auto &frame = frames_[i];
if (!frame.evictable_) {
continue;
}
if (*frame_id == -1 || frame_cmp(frames_[i], frames_[*frame_id])) {
*frame_id = i;
}
}
if (*frame_id == -1) {
return false;
}
frames_[*frame_id].Reset();
--curr_size_;
return true;
}

void LRUKReplacer::RecordAccess(frame_id_t frame_id) {
BUSTUB_ASSERT(frame_id < static_cast<int>(replacer_size_), "frame id is invalid.");
std::scoped_lock<std::mutex> lock(latch_);
auto &frame = frames_[frame_id];
if (!frame.valid_) {
frame.valid_ = true;
frame.evictable_ = true;
++curr_size_;
}
frame.Access(current_timestamp_++);
}

void LRUKReplacer::SetEvictable(frame_id_t frame_id, bool set_evictable) {
BUSTUB_ASSERT(frame_id < static_cast<int>(replacer_size_), "frame id is invalid.");
std::scoped_lock<std::mutex> lock(latch_);
auto &frame = frames_[frame_id];
// BUSTUB_ASSERT(frame.valid_, "frame id is invalid.");
if (!frame.valid_) {
return;
}
if (frame.evictable_ != set_evictable) {
frame.evictable_ = set_evictable;
curr_size_ += set_evictable ? 1 : -1;
}
}

void LRUKReplacer::Remove(frame_id_t frame_id) {
if (frame_id >= static_cast<int>(replacer_size_)) {
return;
}
std::scoped_lock<std::mutex> lock(latch_);
auto &frame = frames_[frame_id];
if (!frame.valid_) {
return;
}
BUSTUB_ASSERT(frame.evictable_, "non-evictable frame is removed.");
frame.Reset();
--curr_size_;
}

auto LRUKReplacer::Size() -> size_t {
std::scoped_lock<std::mutex> lock(latch_);
return curr_size_;
}

Task #3 - Buffer Pool Manager Instance

利用之前写的 Hash Table 和 LRU-K 完成 Buffer Pool

几个坑点:

  • FetchPgImp 中若 page_table_ 中已经存在则要将 pin_count_ + 1 并记录访问
  • UnpinPgImp 中设置 is_dirty_ 时只能将 false 设置为 true,不能改变已经为 true 的值
  • FlushAllPgsImp 中同样需要忽略 is_dirty_,可以通过判断 page_id_ 是否为 INVALID_PAGE_ID 决定是否写入

其实理论上应该在析构的时候再调用一次 FlushAllPgsImp?不过试了试过不了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
auto BufferPoolManagerInstance::NewPgImp(page_id_t *page_id) -> Page * {
std::scoped_lock<std::mutex> lock(latch_);
frame_id_t frame_id;
if (!free_list_.empty()) {
frame_id = free_list_.front();
free_list_.pop_front();
} else if (replacer_->Evict(&frame_id)) {
auto &page = pages_[frame_id];
page_table_->Remove(page.GetPageId());
if (page.IsDirty()) {
disk_manager_->WritePage(page.GetPageId(), page.GetData());
}
} else {
return nullptr;
}
auto &page = pages_[frame_id];
*page_id = AllocatePage();
page.ResetMemory();
page.page_id_ = *page_id;
page.is_dirty_ = false;
page.pin_count_ = 1;
page_table_->Insert(*page_id, frame_id);
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);
return &page;
}

auto BufferPoolManagerInstance::FetchPgImp(page_id_t page_id) -> Page * {
std::scoped_lock<std::mutex> lock(latch_);
frame_id_t frame_id;
if (page_table_->Find(page_id, frame_id)) {
auto &page = pages_[frame_id];
++page.pin_count_;
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);
return &page;
}
if (!free_list_.empty()) {
frame_id = free_list_.front();
free_list_.pop_front();
} else if (replacer_->Evict(&frame_id)) {
auto &page = pages_[frame_id];
if (page.IsDirty()) {
disk_manager_->WritePage(page.GetPageId(), page.GetData());
}
page_table_->Remove(page.GetPageId());
} else {
return nullptr;
}
auto &page = pages_[frame_id];
page.ResetMemory();
page.page_id_ = page_id;
page.is_dirty_ = false;
page.pin_count_ = 1;
disk_manager_->ReadPage(page_id, page.GetData());
page_table_->Insert(page_id, frame_id);
replacer_->RecordAccess(frame_id);
replacer_->SetEvictable(frame_id, false);
return &page;
}

auto BufferPoolManagerInstance::UnpinPgImp(page_id_t page_id, bool is_dirty) -> bool {
std::scoped_lock<std::mutex> lock(latch_);
frame_id_t frame_id;
if (!page_table_->Find(page_id, frame_id)) {
return false;
}
auto &page = pages_[frame_id];
if (page.pin_count_ == 0) {
return false;
}
if (--page.pin_count_ == 0) {
replacer_->SetEvictable(frame_id, true);
}
page.is_dirty_ |= is_dirty;
return true;
}

auto BufferPoolManagerInstance::FlushPgImp(page_id_t page_id) -> bool {
std::scoped_lock<std::mutex> lock(latch_);
frame_id_t frame_id;
if (!page_table_->Find(page_id, frame_id)) {
return false;
}
disk_manager_->WritePage(page_id, pages_[frame_id].GetData());
pages_[frame_id].is_dirty_ = false;
return true;
}

void BufferPoolManagerInstance::FlushAllPgsImp() {
std::scoped_lock<std::mutex> lock(latch_);
for (Page *page = pages_; page < pages_ + pool_size_; ++page) {
if (page->GetPageId() != INVALID_PAGE_ID) {
disk_manager_->WritePage(page->GetPageId(), page->GetData());
page->is_dirty_ = false;
}
}
}

auto BufferPoolManagerInstance::DeletePgImp(page_id_t page_id) -> bool {
std::scoped_lock<std::mutex> lock(latch_);
frame_id_t frame_id;
if (!page_table_->Find(page_id, frame_id)) {
return true;
}
auto &page = pages_[frame_id];
if (page.GetPinCount() > 0) {
return false;
}
page_table_->Remove(page_id);
replacer_->Remove(frame_id);
free_list_.push_back(frame_id);
page.ResetMemory();
page.is_dirty_ = false;
page.page_id_ = INVALID_PAGE_ID;
DeallocatePage(page_id);
return true;
}

毕竟写的都是暴力,测出来时间接近 4s,排在前面的一般都是 2s 左右,其实差距也不大