Skip to content

Commit

Permalink
Load catalog data
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 21, 2024
1 parent c6c8542 commit a13a61c
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 131 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bustubx-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ fn main() {
}
}
Err(ReadlineError::Interrupted) => {
db.flush().unwrap();
println!("CTRL-C");
break;
}
Err(ReadlineError::Eof) => {
db.flush().unwrap();
println!("CTRL-D");
break;
}
Err(err) => {
db.flush().unwrap();
println!("Error: {:?}", err);
break;
}
Expand Down
3 changes: 2 additions & 1 deletion bustubx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ log = "0.4.19"
thiserror = "1.0.56"
tempfile = "3"
derive-with = "0.5.0"
strum = { version = "0.26", features = ["derive"]}
strum = { version = "0.26", features = ["derive"]}
dashmap = "5.5.3"
159 changes: 89 additions & 70 deletions bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dashmap::DashMap;
use std::sync::RwLock;
use std::{
collections::{HashMap, VecDeque},

Check warning on line 4 in bustubx/src/buffer/buffer_pool.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `HashMap`
Expand All @@ -19,47 +20,46 @@ pub const TABLE_HEAP_BUFFER_POOL_SIZE: usize = 100;
pub struct BufferPoolManager {
pool: Vec<Arc<RwLock<Page>>>,
// LRU-K置换算法
pub replacer: LRUKReplacer,
pub replacer: Arc<RwLock<LRUKReplacer>>,
pub disk_manager: Arc<DiskManager>,
// 缓冲池中的页号与frame号的映射
page_table: HashMap<PageId, FrameId>,
page_table: DashMap<PageId, FrameId>,
// 缓冲池中空闲的frame
free_list: VecDeque<FrameId>,
// 缓冲池中的页数
num_pages: usize,
free_list: Arc<RwLock<VecDeque<FrameId>>>,
}
impl BufferPoolManager {
pub fn new(num_pages: usize, disk_manager: Arc<DiskManager>) -> Self {
let mut free_list = VecDeque::with_capacity(num_pages);
let mut pool = vec![];
for i in 0..num_pages {
free_list.push_back(i);
pool.push(Arc::new(RwLock::new(Page::new(0))));
}

Self {
pool: vec![Arc::new(RwLock::new(Page::new(0))); num_pages],
replacer: LRUKReplacer::new(num_pages, 2),
pool,
replacer: Arc::new(RwLock::new(LRUKReplacer::new(num_pages, 2))),
disk_manager,
page_table: HashMap::new(),
free_list,
num_pages,
page_table: DashMap::new(),
free_list: Arc::new(RwLock::new(free_list)),
}
}

// 从缓冲池创建一个新页
pub fn new_page(&mut self) -> BustubxResult<Arc<RwLock<Page>>> {
pub fn new_page(&self) -> BustubxResult<Arc<RwLock<Page>>> {
// 缓冲池已满且无可替换的页
if self.free_list.is_empty() && self.replacer.size() == 0 {
if self.free_list.read().unwrap().is_empty() && self.replacer.read().unwrap().size() == 0 {
return Err(BustubxError::Storage(
"Cannot new page because buffer pool is full and no page to evict".to_string(),
));
}

// 分配一个frame
let frame_id = if !self.free_list.is_empty() {
// 有空闲frame则直接分配
self.free_list.pop_front().unwrap()
let frame_id = if let Some(frame_id) = self.free_list.write().unwrap().pop_front() {
frame_id
} else {
// 无空闲frame,从缓冲池中替换一个页
if let Some(frame_id) = self.replacer.evict() {
if let Some(frame_id) = self.replacer.write().unwrap().evict() {
let evicted_page = self.pool[frame_id].clone();
let evicted_page_id = evicted_page.read().unwrap().page_id;
// 如果页被修改过,则将其写回磁盘
Expand All @@ -79,26 +79,32 @@ impl BufferPoolManager {
self.page_table.insert(new_page_id, frame_id);
let mut new_page = Page::new(new_page_id);
new_page.pin_count = 1;
self.pool[frame_id] = Arc::new(RwLock::new(new_page));
self.pool[frame_id].write().unwrap().replace(new_page);

self.replacer.record_access(frame_id)?;
self.replacer.set_evictable(frame_id, false)?;
self.replacer.write().unwrap().record_access(frame_id)?;
self.replacer
.write()
.unwrap()
.set_evictable(frame_id, false)?;

Ok(self.pool[frame_id].clone())
}

pub fn fetch_page(&mut self, page_id: PageId) -> BustubxResult<Arc<RwLock<Page>>> {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = self.pool[frame_id].clone();
pub fn fetch_page(&self, page_id: PageId) -> BustubxResult<Arc<RwLock<Page>>> {
if let Some(frame_id) = self.page_table.get(&page_id) {
println!("LWZTEST frame_id: {}", *frame_id);
let page = self.pool[*frame_id].clone();
page.write().unwrap().pin_count += 1;
self.replacer.set_evictable(frame_id, false)?;
self.replacer
.write()
.unwrap()
.set_evictable(*frame_id, false)?;
Ok(page)
} else {
// 分配一个frame
let frame_id = if !self.free_list.is_empty() {
self.free_list.pop_front().unwrap()
} else if let Some(frame_id) = self.replacer.evict() {
let frame_id = if let Some(frame_id) = self.free_list.write().unwrap().pop_front() {
frame_id
} else if let Some(frame_id) = self.replacer.write().unwrap().evict() {
let evicted_page = self.pool[frame_id].clone();
let evicted_page_id = evicted_page.read().unwrap().page_id;
let is_dirty = evicted_page.read().unwrap().is_dirty;
Expand All @@ -110,42 +116,47 @@ impl BufferPoolManager {
} else {
return Err(BustubxError::Storage("Failed to evict page".to_string()));
};

// 从磁盘读取页
self.page_table.insert(page_id, frame_id);
let mut new_page = Page::new(page_id);
new_page.pin_count = 1;
new_page.data = self.disk_manager.read_page(page_id).unwrap();
self.pool[frame_id] = Arc::new(RwLock::new(new_page));
new_page.data = self.disk_manager.read_page(page_id)?;
self.pool[frame_id].write().unwrap().replace(new_page);

self.replacer.record_access(frame_id)?;
self.replacer.set_evictable(frame_id, false)?;
self.replacer.write().unwrap().record_access(frame_id)?;
self.replacer
.write()
.unwrap()
.set_evictable(frame_id, false)?;

Ok(self.pool[frame_id].clone())
}
}

pub fn write_page(&mut self, page_id: PageId, data: [u8; BUSTUBX_PAGE_SIZE]) {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = self.pool[frame_id].clone();
pub fn write_page(&self, page_id: PageId, data: [u8; BUSTUBX_PAGE_SIZE]) {
if let Some(frame_id) = self.page_table.get(&page_id) {
let page = self.pool[*frame_id].clone();
page.write().unwrap().data = data;
page.write().unwrap().is_dirty = true;
}
}

// 从缓冲池中取消固定页
pub fn unpin_page(&mut self, page_id: PageId, is_dirty: bool) -> BustubxResult<bool> {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = self.pool[frame_id].clone();
pub fn unpin_page(&self, page_id: PageId, is_dirty: bool) -> BustubxResult<bool> {
if let Some(frame_id) = self.page_table.get(&page_id) {
let page = self.pool[*frame_id].clone();
if page.read().unwrap().pin_count == 0 {
return Ok(false);
}
page.write().unwrap().pin_count -= 1;
page.write().unwrap().is_dirty |= is_dirty;
let pin_count = page.read().unwrap().pin_count;
if pin_count == 0 {
self.replacer.set_evictable(frame_id, true)?;
self.replacer
.write()
.unwrap()
.set_evictable(*frame_id, true)?;
}
Ok(true)
} else {
Expand All @@ -154,10 +165,9 @@ impl BufferPoolManager {
}

// 将缓冲池中指定页写回磁盘
pub fn flush_page(&mut self, page_id: PageId) -> BustubxResult<bool> {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = self.pool[frame_id].clone();
pub fn flush_page(&self, page_id: PageId) -> BustubxResult<bool> {
if let Some(frame_id) = self.page_table.get(&page_id) {
let page = self.pool[*frame_id].clone();
self.disk_manager
.write_page(page_id, &page.read().unwrap().data)?;
page.write().unwrap().is_dirty = false;
Expand All @@ -168,35 +178,38 @@ impl BufferPoolManager {
}

// 将缓冲池中的所有页写回磁盘
pub fn flush_all_pages(&mut self) -> BustubxResult<()> {
let page_ids: Vec<PageId> = self.page_table.keys().copied().collect();
pub fn flush_all_pages(&self) -> BustubxResult<()> {
let page_ids: Vec<PageId> = self.page_table.iter().map(|e| *e.key()).collect();
for page_id in page_ids {
self.flush_page(page_id)?;
}
Ok(())
}

// 删除缓冲池中的页
pub fn delete_page(&mut self, page_id: PageId) -> BustubxResult<bool> {
if !self.page_table.contains_key(&page_id) {
return Ok(true);
}
let frame_id = self.page_table[&page_id];
let page = self.pool[frame_id].clone();
if page.read().unwrap().pin_count > 0 {
// 页被固定,无法删除
return Ok(false);
}
pub fn delete_page(&self, page_id: PageId) -> BustubxResult<bool> {

Check warning on line 190 in bustubx/src/buffer/buffer_pool.rs

View workflow job for this annotation

GitHub Actions / Test Suite

method `delete_page` is never used
if let Some(frame_id_lock) = self.page_table.get(&page_id) {
let frame_id = *frame_id_lock;
drop(frame_id_lock);

// 从缓冲池中删除
page.write().unwrap().destroy();
self.page_table.remove(&page_id);
self.free_list.push_back(frame_id);
self.replacer.remove(frame_id);
let page = self.pool[frame_id].clone();
if page.read().unwrap().pin_count > 0 {
// 页被固定,无法删除
return Ok(false);
}

// 从缓冲池中删除
page.write().unwrap().destroy();
self.page_table.remove(&page_id);
self.free_list.write().unwrap().push_back(frame_id);
self.replacer.write().unwrap().remove(frame_id);

// 从磁盘上删除
self.disk_manager.deallocate_page(page_id)?;
Ok(true)
// 从磁盘上删除
self.disk_manager.deallocate_page(page_id)?;
Ok(true)
} else {
Ok(true)
}
}
}

Expand All @@ -216,9 +229,15 @@ mod tests {
let page1 = buffer_pool.new_page().unwrap().clone();
let page1_id = page1.read().unwrap().page_id;
assert_eq!(buffer_pool.pool[0].read().unwrap().page_id, page1_id,);
assert_eq!(buffer_pool.page_table[&page1.read().unwrap().page_id], 0);
assert_eq!(buffer_pool.free_list.len(), 2);
assert_eq!(buffer_pool.replacer.size(), 0);
assert_eq!(
*buffer_pool
.page_table
.get(&page1.read().unwrap().page_id)
.unwrap(),
0
);
assert_eq!(buffer_pool.free_list.read().unwrap().len(), 2);
assert_eq!(buffer_pool.replacer.read().unwrap().size(), 0);

let page2 = buffer_pool.new_page().unwrap();
let page2_id = page2.read().unwrap().page_id;
Expand Down Expand Up @@ -285,7 +304,7 @@ mod tests {
assert_eq!(page.read().unwrap().page_id, page2_id);
buffer_pool.unpin_page(page2_id, false).unwrap();

assert_eq!(buffer_pool.replacer.size(), 3);
assert_eq!(buffer_pool.replacer.read().unwrap().size(), 3);
}

#[test]
Expand All @@ -311,8 +330,8 @@ mod tests {
let res = buffer_pool.delete_page(page1_id).unwrap();
assert!(res);
assert_eq!(buffer_pool.pool.len(), 3);
assert_eq!(buffer_pool.free_list.len(), 1);
assert_eq!(buffer_pool.replacer.size(), 2);
assert_eq!(buffer_pool.free_list.read().unwrap().len(), 1);
assert_eq!(buffer_pool.replacer.read().unwrap().size(), 2);
assert_eq!(buffer_pool.page_table.len(), 2);

let page = buffer_pool.fetch_page(page1_id).unwrap();
Expand Down
7 changes: 7 additions & 0 deletions bustubx/src/buffer/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,11 @@ impl Page {
self.pin_count = 0;
self.is_dirty = false;
}

pub fn replace(&mut self, other: Page) {
self.page_id = other.page_id;
self.data = other.data;
self.pin_count = other.pin_count;
self.is_dirty = other.is_dirty;
}
}
Loading

0 comments on commit a13a61c

Please sign in to comment.