Skip to content

Commit

Permalink
Improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 15, 2024
1 parent 17bb0cd commit d87dffa
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 185 deletions.
3 changes: 1 addition & 2 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl VolcanoExecutor for PhysicalInsert {
delete_txn_id: 0,
is_deleted: false,
};
// TODO check result
table_heap.insert_tuple(&tuple_meta, &tuple);
table_heap.insert_tuple(&tuple_meta, &tuple)?;
self.insert_rows
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Expand Down
11 changes: 5 additions & 6 deletions bustubx/src/storage/codec/table_page.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::buffer::BUSTUBX_PAGE_SIZE;
use crate::catalog::SchemaRef;
use crate::common::rid::Rid;
use crate::common::util::page_bytes_to_array;
use crate::storage::codec::{CommonCodec, DecodedData};
use crate::storage::table_page::{TablePageHeader, TupleInfo};
use crate::storage::{TablePage, TupleMeta};
Expand All @@ -24,14 +25,12 @@ impl TablePageCodec {
bytes.len()
)));
}
let (header, offset) = TablePageHeaderCodec::decode(bytes)?;
let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&bytes[0..BUSTUBX_PAGE_SIZE]);
let (header, _) = TablePageHeaderCodec::decode(bytes)?;
Ok((
TablePage {
schema,
header,
data,
data: page_bytes_to_array(&bytes[0..BUSTUBX_PAGE_SIZE]),
},
BUSTUBX_PAGE_SIZE,
))
Expand Down Expand Up @@ -175,8 +174,8 @@ mod tests {
};

let mut table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
table_page.insert_tuple(&tuple1_meta, &tuple1);
table_page.insert_tuple(&tuple2_meta, &tuple2);
table_page.insert_tuple(&tuple1_meta, &tuple1).unwrap();
table_page.insert_tuple(&tuple2_meta, &tuple2).unwrap();

let (new_page, _) =
TablePageCodec::decode(&TablePageCodec::encode(&table_page), schema.clone()).unwrap();
Expand Down
62 changes: 32 additions & 30 deletions bustubx/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ impl BPlusTreeIndex {
// 向右分裂出一个新page
let internalkv = self.split(&mut curr_page);

let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&curr_page));

self.buffer_pool_manager.write_page(curr_page_id, data);
self.buffer_pool_manager.write_page(
curr_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(&curr_page)),
);
self.buffer_pool_manager
.unpin_page(curr_page_id, true)
.unwrap();
Expand Down Expand Up @@ -177,10 +177,10 @@ impl BPlusTreeIndex {
}
}

let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&curr_page));

self.buffer_pool_manager.write_page(curr_page_id, data);
self.buffer_pool_manager.write_page(
curr_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(&curr_page)),
);
self.buffer_pool_manager
.unpin_page(curr_page_id, true)
.unwrap();
Expand Down Expand Up @@ -279,11 +279,12 @@ impl BPlusTreeIndex {
}
};
// 更新兄弟节点
let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&left_sibling_tree_page));

self.buffer_pool_manager
.write_page(left_sibling_page_id, data);
self.buffer_pool_manager.write_page(
left_sibling_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(
&left_sibling_tree_page,
)),
);
self.buffer_pool_manager
.unpin_page(left_sibling_page_id, true)
.unwrap();
Expand Down Expand Up @@ -364,11 +365,12 @@ impl BPlusTreeIndex {
}
};
// 更新兄弟节点
let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&right_sibling_tree_page));

self.buffer_pool_manager
.write_page(right_sibling_page_id, data);
self.buffer_pool_manager.write_page(
right_sibling_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(
&right_sibling_tree_page,
)),
);
self.buffer_pool_manager
.unpin_page(right_sibling_page_id, true)
.unwrap();
Expand Down Expand Up @@ -443,11 +445,11 @@ impl BPlusTreeIndex {
}
}
};
let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&left_sibling_tree_page));

self.buffer_pool_manager
.write_page(left_sibling_page_id, data);
self.buffer_pool_manager.write_page(
left_sibling_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(&left_sibling_tree_page)),
);

// 删除当前页
let deleted_page_id = curr_page_id;
Expand Down Expand Up @@ -539,10 +541,10 @@ impl BPlusTreeIndex {
}
};

let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&curr_page));

self.buffer_pool_manager.write_page(curr_page_id, data);
self.buffer_pool_manager.write_page(
curr_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(&curr_page)),
);

// 删除右兄弟页
let deleted_page_id = right_sibling_page_id;
Expand Down Expand Up @@ -591,10 +593,10 @@ impl BPlusTreeIndex {
}
}

let mut data = [0; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&BPlusTreePageCodec::encode(&curr_page));

self.buffer_pool_manager.write_page(curr_page_id, data);
self.buffer_pool_manager.write_page(
curr_page_id,
page_bytes_to_array(&BPlusTreePageCodec::encode(&curr_page)),
);
self.buffer_pool_manager
.unpin_page(curr_page_id, true)
.unwrap();
Expand Down
138 changes: 60 additions & 78 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,15 @@ impl TableHeap {
///
/// Returns:
/// An `Option` containing the `Rid` of the inserted tuple if successful, otherwise `None`.
pub fn insert_tuple(&mut self, meta: &TupleMeta, tuple: &Tuple) -> Option<Rid> {
pub fn insert_tuple(&mut self, meta: &TupleMeta, tuple: &Tuple) -> BustubxResult<Rid> {
let mut last_page_id = self.last_page_id;
let last_page = self
.buffer_pool_manager
.fetch_page(self.last_page_id)
.expect("Can not fetch last page");
let last_page = self.buffer_pool_manager.fetch_page(self.last_page_id)?;

// Loop until a suitable page is found for inserting the tuple
let (mut last_table_page, _) =
TablePageCodec::decode(&last_page.read().unwrap().data, self.schema.clone()).unwrap();
TablePageCodec::decode(&last_page.read().unwrap().data, self.schema.clone())?;
loop {
if last_table_page.get_next_tuple_offset(meta, tuple).is_some() {
if last_table_page.next_tuple_offset(tuple).is_ok() {
break;
}

Expand All @@ -73,10 +70,7 @@ impl TableHeap {
);

// Allocate a new page if no more table pages are available.
let next_page = self
.buffer_pool_manager
.new_page()
.expect("cannot allocate page");
let next_page = self.buffer_pool_manager.new_page()?;
let next_page_id = next_page.read().unwrap().page_id;
let next_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
next_page.write().unwrap().data =
Expand All @@ -85,13 +79,11 @@ impl TableHeap {
// Update and release the previous page
last_table_page.header.next_page_id = next_page_id;

let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&last_table_page));

self.buffer_pool_manager.write_page(last_page_id, data);
self.buffer_pool_manager
.unpin_page(last_page_id, true)
.unwrap();
self.buffer_pool_manager.write_page(
last_page_id,
page_bytes_to_array(&TablePageCodec::encode(&last_table_page)),
);
self.buffer_pool_manager.unpin_page(last_page_id, true)?;

// Update last_page_id.
last_page_id = next_page_id;
Expand All @@ -100,61 +92,45 @@ impl TableHeap {
}

// Insert the tuple into the chosen page
let slot_id = last_table_page.insert_tuple(meta, tuple);
let slot_id = last_table_page.insert_tuple(meta, tuple)?;

let mut data = [0u8; BUSTUBX_PAGE_SIZE];
data.copy_from_slice(&TablePageCodec::encode(&last_table_page));

self.buffer_pool_manager.write_page(last_page_id, data);
self.buffer_pool_manager
.unpin_page(last_page_id, true)
.unwrap();
self.buffer_pool_manager.write_page(
last_page_id,
page_bytes_to_array(&TablePageCodec::encode(&last_table_page)),
);
self.buffer_pool_manager.unpin_page(last_page_id, true)?;

// Map the slot_id to a Rid and return
slot_id.map(|slot_id| Rid::new(last_page_id, slot_id as u32))
Ok(Rid::new(last_page_id, slot_id as u32))
}

pub fn update_tuple_meta(&mut self, meta: &TupleMeta, rid: Rid) {
let page = self
.buffer_pool_manager
.fetch_page(rid.page_id)
.expect("Can not fetch page");
pub fn update_tuple_meta(&mut self, meta: &TupleMeta, rid: Rid) -> BustubxResult<()> {
let page = self.buffer_pool_manager.fetch_page(rid.page_id)?;
let (mut table_page, _) =
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone()).unwrap();
table_page.update_tuple_meta(meta, &rid);
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone())?;
table_page.update_tuple_meta(meta, rid.slot_num as u16)?;

page.write().unwrap().data = page_bytes_to_array(&TablePageCodec::encode(&table_page));
self.buffer_pool_manager
.unpin_page(rid.page_id, true)
.unwrap();
self.buffer_pool_manager.unpin_page(rid.page_id, true)?;
Ok(())
}

pub fn get_tuple(&mut self, rid: Rid) -> (TupleMeta, Tuple) {
let page = self
.buffer_pool_manager
.fetch_page(rid.page_id)
.expect("Can not fetch page");
pub fn tuple(&mut self, rid: Rid) -> BustubxResult<(TupleMeta, Tuple)> {
let page = self.buffer_pool_manager.fetch_page(rid.page_id)?;
let (mut table_page, _) =
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone()).unwrap();
let result = table_page.get_tuple(&rid);
self.buffer_pool_manager
.unpin_page(rid.page_id, false)
.unwrap();
result
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone())?;
let result = table_page.tuple(rid.slot_num as u16)?;
self.buffer_pool_manager.unpin_page(rid.page_id, false)?;
Ok(result)
}

pub fn get_tuple_meta(&mut self, rid: Rid) -> TupleMeta {
let page = self
.buffer_pool_manager
.fetch_page(rid.page_id)
.expect("Can not fetch page");
pub fn tuple_meta(&mut self, rid: Rid) -> BustubxResult<TupleMeta> {
let page = self.buffer_pool_manager.fetch_page(rid.page_id)?;
let (mut table_page, _) =
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone()).unwrap();
let result = table_page.get_tuple_meta(&rid);
self.buffer_pool_manager
.unpin_page(rid.page_id, false)
.unwrap();
result
TablePageCodec::decode(&page.read().unwrap().data, self.schema.clone())?;
let result = table_page.tuple_meta(rid.slot_num as u16)?;
self.buffer_pool_manager.unpin_page(rid.page_id, false)?;
Ok(result)
}

pub fn get_first_rid(&mut self) -> Option<Rid> {
Expand Down Expand Up @@ -233,7 +209,7 @@ impl TableIterator {
if self.stop_at.is_some() && rid == self.stop_at.unwrap() {
return None;
}
let result = table_heap.get_tuple(rid);
let result = table_heap.tuple(rid).unwrap();
self.rid = table_heap.get_next_rid(rid);
Some(result)
}
Expand Down Expand Up @@ -281,24 +257,30 @@ mod tests {
is_deleted: false,
};

table_heap.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
);
table_heap
.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
)
.unwrap();
assert_eq!(table_heap.first_page_id, 0);
assert_eq!(table_heap.last_page_id, 0);

table_heap.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
);
table_heap
.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
)
.unwrap();
assert_eq!(table_heap.first_page_id, 0);
assert_eq!(table_heap.last_page_id, 0);

table_heap.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
);
table_heap
.insert_tuple(
&meta,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
)
.unwrap();
assert_eq!(table_heap.first_page_id, 0);
assert_eq!(table_heap.last_page_id, 0);
}
Expand Down Expand Up @@ -340,13 +322,13 @@ mod tests {
)
.unwrap();

let mut meta = table_heap.get_tuple_meta(rid2);
let mut meta = table_heap.tuple_meta(rid2).unwrap();
meta.insert_txn_id = 1;
meta.delete_txn_id = 2;
meta.is_deleted = true;
table_heap.update_tuple_meta(&meta, rid2);
table_heap.update_tuple_meta(&meta, rid2).unwrap();

let meta = table_heap.get_tuple_meta(rid2);
let meta = table_heap.tuple_meta(rid2).unwrap();
assert_eq!(meta.insert_txn_id, 1);
assert_eq!(meta.delete_txn_id, 2);
assert_eq!(meta.is_deleted, true);
Expand Down Expand Up @@ -399,15 +381,15 @@ mod tests {
)
.unwrap();

let (meta, tuple) = table_heap.get_tuple(rid1);
let (meta, tuple) = table_heap.tuple(rid1).unwrap();
assert_eq!(meta, meta1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);

let (meta, tuple) = table_heap.get_tuple(rid2);
let (meta, tuple) = table_heap.tuple(rid2).unwrap();
assert_eq!(meta, meta2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);

let (meta, tuple) = table_heap.get_tuple(rid3);
let (meta, tuple) = table_heap.tuple(rid3).unwrap();
assert_eq!(meta, meta3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
}
Expand Down
Loading

0 comments on commit d87dffa

Please sign in to comment.