Skip to content

Commit

Permalink
refactor DiskManager
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 28, 2024
1 parent 15a198a commit b014db0
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 78 deletions.
18 changes: 9 additions & 9 deletions bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl BufferPoolManager {
};

// 从磁盘分配一个页
let new_page_id = self.disk_manager.allocate_page();
let new_page_id = self.disk_manager.allocate_page().unwrap();
self.page_table.insert(new_page_id, frame_id);
let mut new_page = Page::new(new_page_id);
new_page.pin_count = 1;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl BufferPoolManager {
self.page_table.insert(page_id, frame_id);
let mut new_page = Page::new(page_id);
new_page.pin_count = 1;
new_page.data = self.disk_manager.read_page(page_id);
new_page.data = self.disk_manager.read_page(page_id).unwrap();
self.pool[frame_id as usize] = new_page;

self.replacer.record_access(frame_id);
Expand Down Expand Up @@ -151,7 +151,7 @@ impl BufferPoolManager {
self.page_table.insert(page_id, frame_id);
let mut new_page = Page::new(page_id);
new_page.pin_count = 1;
new_page.data = self.disk_manager.read_page(page_id);
new_page.data = self.disk_manager.read_page(page_id).unwrap();
self.pool[frame_id as usize] = new_page;

self.replacer.record_access(frame_id);
Expand Down Expand Up @@ -194,7 +194,7 @@ impl BufferPoolManager {
if self.page_table.contains_key(&page_id) {
let frame_id = self.page_table[&page_id];
let page = &mut self.pool[frame_id as usize];
self.disk_manager.write_page(page_id, &page.data);
self.disk_manager.write_page(page_id, &page.data).unwrap();
page.is_dirty = false;
return true;
} else {
Expand Down Expand Up @@ -229,7 +229,7 @@ impl BufferPoolManager {
self.replacer.remove(frame_id);

// 从磁盘上删除
self.disk_manager.deallocate_page(page_id);
self.disk_manager.deallocate_page(page_id).unwrap();
return true;
}
}
Expand All @@ -243,7 +243,7 @@ mod tests {
let db_path = "./test_buffer_pool_manager_new_page.db";
let _ = remove_file(db_path);

let disk_manager = DiskManager::new(db_path.to_string());
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));
let page = buffer_pool_manager.new_page().unwrap().clone();
assert_eq!(page.page_id, 0);
Expand Down Expand Up @@ -271,7 +271,7 @@ mod tests {
let db_path = "./test_buffer_pool_manager_unpin_page.db";
let _ = remove_file(db_path);

let disk_manager = DiskManager::new(db_path.to_string());
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));

let page = buffer_pool_manager.new_page().unwrap();
Expand All @@ -292,7 +292,7 @@ mod tests {
let db_path = "./test_buffer_pool_manager_fetch_page.db";
let _ = remove_file(db_path);

let disk_manager = DiskManager::new(db_path.to_string());
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));

let page = buffer_pool_manager.new_page().unwrap();
Expand Down Expand Up @@ -321,7 +321,7 @@ mod tests {
let db_path = "./test_buffer_pool_manager_delete_page.db";
let _ = remove_file(db_path);

let disk_manager = DiskManager::new(db_path.to_string());
let disk_manager = DiskManager::try_new(&db_path).unwrap();
let mut buffer_pool_manager = BufferPoolManager::new(3, Arc::new(disk_manager));

let page_id = buffer_pool_manager.new_page().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ mod tests {
let db_path = "./test_catalog_create_table.db";
let _ = remove_file(db_path);

let disk_manager = disk_manager::DiskManager::new(db_path.to_string());
let disk_manager = disk_manager::DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
let db_path = "./test_catalog_get_table.db";
let _ = remove_file(db_path);

let disk_manager = disk_manager::DiskManager::new(db_path.to_string());
let disk_manager = disk_manager::DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

Expand Down Expand Up @@ -337,7 +337,7 @@ mod tests {
let db_path = "./test_catalog_create_index.db";
let _ = remove_file(db_path);

let disk_manager = disk_manager::DiskManager::new(db_path.to_string());
let disk_manager = disk_manager::DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut catalog = super::Catalog::new(buffer_pool_manager);

Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Database {
}
impl Database {
pub fn new_on_disk(db_path: &str) -> Self {
let disk_manager = Arc::new(DiskManager::new(db_path.to_string()));
let disk_manager = Arc::new(DiskManager::try_new(&db_path).unwrap());
let buffer_pool_manager =
BufferPoolManager::new(TABLE_HEAP_BUFFER_POOL_SIZE, disk_manager.clone());
// TODO load catalog from disk
Expand All @@ -37,7 +37,7 @@ impl Database {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");
let _ = std::fs::File::create(temp_path.clone()).unwrap();
let disk_manager = Arc::new(DiskManager::new(temp_path.to_str().unwrap().to_string()));
let disk_manager = Arc::new(DiskManager::try_new(temp_path.to_str().unwrap()).unwrap());
let buffer_pool_manager =
BufferPoolManager::new(TABLE_HEAP_BUFFER_POOL_SIZE, disk_manager.clone());
let catalog = Catalog::new(buffer_pool_manager);
Expand Down
2 changes: 2 additions & 0 deletions bustubx/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ pub enum BustubxError {
NotImplement(String),
#[error("Internal error: {0}")]
Internal(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
110 changes: 53 additions & 57 deletions bustubx/src/storage/disk_manager.rs
Original file line number Diff line number Diff line change
@@ -1,87 +1,80 @@
use std::fs::File;
use std::path::Path;
use std::{
io::{Read, Seek, Write},
sync::{atomic::AtomicU32, Mutex, MutexGuard},
};

use crate::common::config::BUSTUBX_PAGE_SIZE;
use crate::error::BustubxResult;

use super::page::PageId;

static EMPTY_PAGE: [u8; BUSTUBX_PAGE_SIZE] = [0; BUSTUBX_PAGE_SIZE];

#[derive(Debug)]
pub struct DiskManager {
pub db_path: String,
pub next_page_id: AtomicU32,
inner: Mutex<Inner>,
}

#[derive(Debug)]
struct Inner {
db_file: std::fs::File,
db_file: Mutex<File>,
}

impl DiskManager {
pub fn new(db_path: String) -> Self {
pub fn try_new(db_path: impl AsRef<Path>) -> BustubxResult<Self> {
// Create a file handle db_file using the OpenOptions struct from the Rust standard library.
// By chaining calls, we set the read, write and create modes of the file.
let db_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&db_path)
.unwrap();
.open(db_path)?;

// Gets the metadata of the database file, including the size of the file.
// Then it divides the file size by the page size.
// Convert the result to a page number PageId.
// Note: This is the next available page number.
let next_page_id = db_file
.metadata()
.unwrap()
.metadata()?
.len()
.div_euclid(BUSTUBX_PAGE_SIZE as u64) as PageId;
println!("Initialized disk_manager next_page_id: {}", next_page_id);

Self {
db_path,
Ok(Self {
next_page_id: AtomicU32::new(next_page_id),
// Use a mutex to wrap the file handle to ensure that only one thread
// can access the file at the same time among multiple threads.
inner: Mutex::new(Inner { db_file }),
}
db_file: Mutex::new(db_file),
})
}

// 读取磁盘指定页的数据
pub fn read_page(&self, page_id: PageId) -> [u8; BUSTUBX_PAGE_SIZE] {
let mut guard = self.inner.lock().unwrap();
pub fn read_page(&self, page_id: PageId) -> BustubxResult<[u8; BUSTUBX_PAGE_SIZE]> {
let mut guard = self.db_file.lock().unwrap();
let mut buf = [0; BUSTUBX_PAGE_SIZE];

// guard.db_file is a file object, set the file pointer to
// the specified position through the .seek(...) method.
// Specifically, locate the file pointer to the starting
// position of the corresponding page.
// Here ... should be a suitable offset.
guard
.db_file
.seek(std::io::SeekFrom::Start(
(page_id as usize * BUSTUBX_PAGE_SIZE) as u64,
))
.unwrap();
guard.seek(std::io::SeekFrom::Start(
(page_id as usize * BUSTUBX_PAGE_SIZE) as u64,
))?;
// Read buf.len() bytes of data from the file, and store the data in the buf array.
guard.db_file.read_exact(&mut buf).unwrap();
guard.read_exact(&mut buf)?;

buf
Ok(buf)
}

// 将数据写入磁盘指定页
pub fn write_page(&self, page_id: PageId, data: &[u8]) {
pub fn write_page(&self, page_id: PageId, data: &[u8]) -> BustubxResult<()> {
assert_eq!(data.len(), BUSTUBX_PAGE_SIZE);
let mut guard = self.inner.lock().unwrap();
Self::_write_page(&mut guard, page_id, data);
let mut guard = self.db_file.lock().unwrap();
Self::write_page_internal(&mut guard, page_id, data)
}

// TODO 使用bitmap管理
pub fn allocate_page(&self) -> PageId {
let mut guard = self.inner.lock().unwrap();
pub fn allocate_page(&self) -> BustubxResult<PageId> {
let mut guard = self.db_file.lock().unwrap();

// Load the current value of next_page_id using atomic load operation.
// Increment the next_page_id by 1 using atomic fetch_add operation.
Expand All @@ -90,62 +83,65 @@ impl DiskManager {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

// Write an empty page (all zeros) to the allocated page.
Self::_write_page(&mut guard, page_id, &[0; BUSTUBX_PAGE_SIZE]);
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;

page_id
Ok(page_id)
}

pub fn deallocate_page(&self, page_id: PageId) {
pub fn deallocate_page(&self, page_id: PageId) -> BustubxResult<()> {
// TODO 利用pageId或者释放的空间
// TODO 添加单测
let mut guard = self.inner.lock().unwrap();
let mut guard = self.db_file.lock().unwrap();

// Write an empty page (all zeros) to the deallocated page.
// But this page is not deallocated, only data will be written with null or zeros.
Self::_write_page(&mut guard, page_id, &[0; BUSTUBX_PAGE_SIZE]);
Self::write_page_internal(&mut guard, page_id, &EMPTY_PAGE)
}

fn _write_page(guard: &mut MutexGuard<Inner>, page_id: PageId, data: &[u8]) {
fn write_page_internal(
guard: &mut MutexGuard<File>,
page_id: PageId,
data: &[u8],
) -> BustubxResult<()> {
// Seek to the start of the page in the database file and write the data.
guard
.db_file
.seek(std::io::SeekFrom::Start(
(page_id as usize * BUSTUBX_PAGE_SIZE) as u64,
))
.unwrap();
guard.db_file.write_all(data).unwrap();
guard.write_all(data)?;
Ok(())
}

pub fn db_file_len(&self) -> u64 {
let guard = self.inner.lock().unwrap();
guard.db_file.metadata().unwrap().len()
pub fn db_file_len(&self) -> BustubxResult<u64> {
let guard = self.db_file.lock().unwrap();
let meta = guard.metadata()?;
Ok(meta.len())
}
}

#[cfg(test)]
mod tests {
use std::io::{Read, Seek, Write};

use crate::common::config::BUSTUBX_PAGE_SIZE;

#[test]
pub fn test_disk_manager_allocate_page() {
let db_path = "test_disk_manager_allocate_page.db";
let _ = std::fs::remove_file(db_path);

let disk_manager = super::DiskManager::new(db_path.to_string());
let disk_manager = super::DiskManager::try_new(&db_path).unwrap();

let page_id = disk_manager.allocate_page();
let page_id = disk_manager.allocate_page().unwrap();
assert_eq!(page_id, 0);
let page = disk_manager.read_page(page_id);
let page = disk_manager.read_page(page_id).unwrap();
assert_eq!(page, [0; 4096]);

let page_id = disk_manager.allocate_page();
let page_id = disk_manager.allocate_page().unwrap();
assert_eq!(page_id, 1);
let page = disk_manager.read_page(page_id);
let page = disk_manager.read_page(page_id).unwrap();
assert_eq!(page, [0; 4096]);

let db_file_len = disk_manager.db_file_len();
let db_file_len = disk_manager.db_file_len().unwrap();
assert_eq!(db_file_len, 8192);

let _ = std::fs::remove_file(db_path);
Expand All @@ -156,21 +152,21 @@ mod tests {
let db_path = "test_disk_manager_write_page.db";
let _ = std::fs::remove_file(db_path);

let disk_manager = super::DiskManager::new(db_path.to_string());
let disk_manager = super::DiskManager::try_new(&db_path).unwrap();

let page_id1 = disk_manager.allocate_page();
let page_id2 = disk_manager.allocate_page();
let page_id1 = disk_manager.allocate_page().unwrap();
let page_id2 = disk_manager.allocate_page().unwrap();

let mut page1 = vec![1, 2, 3];
page1.extend(vec![0; BUSTUBX_PAGE_SIZE - 3]);
disk_manager.write_page(page_id1, &page1);
let page = disk_manager.read_page(page_id1);
disk_manager.write_page(page_id1, &page1).unwrap();
let page = disk_manager.read_page(page_id1).unwrap();
assert_eq!(page, page1.as_slice());

let mut page2 = vec![0; BUSTUBX_PAGE_SIZE - 3];
page2.extend(vec![1, 2, 3]);
disk_manager.write_page(page_id2, &page2);
let page = disk_manager.read_page(page_id2);
disk_manager.write_page(page_id2, &page2).unwrap();
let page = disk_manager.read_page(page_id2).unwrap();
assert_eq!(page, page2.as_slice());

let _ = std::fs::remove_file(db_path);
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ mod tests {
]),
vec![0, 1],
);
let disk_manager = disk_manager::DiskManager::new(db_path.to_string());
let disk_manager = disk_manager::DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = buffer_pool::BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut index = BPlusTreeIndex::new(index_metadata, buffer_pool_manager, 2, 3);

Expand Down Expand Up @@ -869,7 +869,7 @@ mod tests {
]),
vec![0, 1],
);
let disk_manager = disk_manager::DiskManager::new(db_path.to_string());
let disk_manager = disk_manager::DiskManager::try_new(&db_path).unwrap();
let buffer_pool_manager = buffer_pool::BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut index = BPlusTreeIndex::new(index_metadata, buffer_pool_manager, 4, 5);

Expand Down
Loading

0 comments on commit b014db0

Please sign in to comment.