Skip to content

Commit

Permalink
Add schema to tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 29, 2024
1 parent 601e33e commit f087158
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 168 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/planner/operator/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::catalog::{Schema, SchemaRef};
use crate::catalog::SchemaRef;

#[derive(derive_new::new, Debug, Clone)]
pub struct LogicalCreateIndexOperator {
Expand Down
7 changes: 4 additions & 3 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ impl VolcanoExecutor for PhysicalInsert {
let insert_rows = self.insert_rows.load(std::sync::atomic::Ordering::SeqCst);
self.insert_rows
.store(0, std::sync::atomic::Ordering::SeqCst);
return Some(Tuple::from_values(vec![ScalarValue::Int32(Some(
insert_rows as i32,
))]));
return Some(Tuple::from_values(
Arc::new(self.output_schema()),
vec![ScalarValue::Int32(Some(insert_rows as i32))],
));
}
}

Expand Down
5 changes: 4 additions & 1 deletion bustubx/src/planner/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ impl VolcanoExecutor for PhysicalProject {
for expr in &self.expressions {
new_values.push(expr.evaluate(next_tuple.as_ref(), Some(&self.input.output_schema())));
}
return Some(Tuple::from_values(new_values));
return Some(Tuple::from_values(
Arc::new(self.output_schema()),
new_values,
));
}
}
3 changes: 2 additions & 1 deletion bustubx/src/planner/physical_plan/values.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use crate::catalog::ColumnRef;
use crate::{
Expand Down Expand Up @@ -40,7 +41,7 @@ impl VolcanoExecutor for PhysicalValues {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst) as usize;
if cursor < self.tuples.len() {
let values = self.tuples[cursor].clone();
return Some(Tuple::from_values(values));
return Some(Tuple::from_values(Arc::new(self.output_schema()), values));
} else {
return None;
}
Expand Down
123 changes: 72 additions & 51 deletions bustubx/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ impl BPlusTreeIndex {

// internal page第一个kv对的key为空
new_internal_page.insert(
Tuple::empty(self.index_metadata.key_schema.fixed_len()),
Tuple::empty(
self.index_metadata.key_schema.clone(),
self.index_metadata.key_schema.fixed_len(),
),
self.root_page_id,
&self.index_metadata.key_schema,
);
Expand Down Expand Up @@ -818,54 +821,65 @@ mod tests {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");

let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8),
Column::new("b".to_string(), DataType::Int16),
]));
let index_metadata = IndexMetadata::new(
"test_index".to_string(),
"test_table".to_string(),
Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8),
Column::new("b".to_string(), DataType::Int16),
])),
schema.clone(),
vec![0, 1],
);
let disk_manager = DiskManager::try_new(&temp_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut index = BPlusTreeIndex::new(index_metadata, buffer_pool_manager, 2, 3);

index.insert(&Tuple::new(vec![1, 1, 1]), Rid::new(1, 1));
index.insert(&Tuple::new(schema.clone(), vec![1, 1, 1]), Rid::new(1, 1));
assert_eq!(
index.get(&Tuple::new(vec![1, 1, 1])).unwrap(),
index
.get(&Tuple::new(schema.clone(), vec![1, 1, 1]))
.unwrap(),
Rid::new(1, 1)
);
assert_eq!(index.root_page_id, 0);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.insert(&Tuple::new(vec![2, 2, 2]), Rid::new(2, 2));
index.insert(&Tuple::new(schema.clone(), vec![2, 2, 2]), Rid::new(2, 2));
assert_eq!(
index.get(&Tuple::new(vec![2, 2, 2])).unwrap(),
index
.get(&Tuple::new(schema.clone(), vec![2, 2, 2]))
.unwrap(),
Rid::new(2, 2)
);
assert_eq!(index.root_page_id, 0);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.insert(&Tuple::new(vec![3, 3, 3]), Rid::new(3, 3));
index.insert(&Tuple::new(schema.clone(), vec![3, 3, 3]), Rid::new(3, 3));
assert_eq!(
index.get(&Tuple::new(vec![3, 3, 3])).unwrap(),
index
.get(&Tuple::new(schema.clone(), vec![3, 3, 3]))
.unwrap(),
Rid::new(3, 3)
);
assert_eq!(index.root_page_id, 2);
assert_eq!(index.buffer_pool_manager.replacer.size(), 3);

index.insert(&Tuple::new(vec![4, 4, 4]), Rid::new(4, 4));
index.insert(&Tuple::new(schema.clone(), vec![4, 4, 4]), Rid::new(4, 4));
assert_eq!(
index.get(&Tuple::new(vec![4, 4, 4])).unwrap(),
index
.get(&Tuple::new(schema.clone(), vec![4, 4, 4]))
.unwrap(),
Rid::new(4, 4)
);
assert_eq!(index.root_page_id, 2);
assert_eq!(index.buffer_pool_manager.replacer.size(), 4);

index.insert(&Tuple::new(vec![5, 5, 5]), Rid::new(5, 5));
index.insert(&Tuple::new(schema.clone(), vec![5, 5, 5]), Rid::new(5, 5));
assert_eq!(
index.get(&Tuple::new(vec![5, 5, 5])).unwrap(),
index
.get(&Tuple::new(schema.clone(), vec![5, 5, 5]))
.unwrap(),
Rid::new(5, 5)
);
assert_eq!(index.root_page_id, 6);
Expand All @@ -877,86 +891,93 @@ mod tests {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");

let schema = Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8),
Column::new("b".to_string(), DataType::Int16),
]));
let index_metadata = IndexMetadata::new(
"test_index".to_string(),
"test_table".to_string(),
Arc::new(Schema::new(vec![
Column::new("a".to_string(), DataType::Int8),
Column::new("b".to_string(), DataType::Int16),
])),
schema.clone(),
vec![0, 1],
);
let disk_manager = DiskManager::try_new(&temp_path).unwrap();
let buffer_pool_manager = BufferPoolManager::new(1000, Arc::new(disk_manager));
let mut index = BPlusTreeIndex::new(index_metadata, buffer_pool_manager, 4, 5);

index.insert(&Tuple::new(vec![1, 1, 1]), Rid::new(1, 1));
index.insert(&Tuple::new(vec![2, 2, 2]), Rid::new(2, 2));
index.insert(&Tuple::new(vec![3, 3, 3]), Rid::new(3, 3));
index.insert(&Tuple::new(vec![4, 4, 4]), Rid::new(4, 4));
index.insert(&Tuple::new(vec![5, 5, 5]), Rid::new(5, 5));
index.insert(&Tuple::new(vec![6, 6, 6]), Rid::new(6, 6));
index.insert(&Tuple::new(vec![7, 7, 7]), Rid::new(7, 7));
index.insert(&Tuple::new(vec![8, 8, 8]), Rid::new(8, 8));
index.insert(&Tuple::new(vec![9, 9, 9]), Rid::new(9, 9));
index.insert(&Tuple::new(vec![10, 10, 10]), Rid::new(10, 10));
index.insert(&Tuple::new(schema.clone(), vec![1, 1, 1]), Rid::new(1, 1));
index.insert(&Tuple::new(schema.clone(), vec![2, 2, 2]), Rid::new(2, 2));
index.insert(&Tuple::new(schema.clone(), vec![3, 3, 3]), Rid::new(3, 3));
index.insert(&Tuple::new(schema.clone(), vec![4, 4, 4]), Rid::new(4, 4));
index.insert(&Tuple::new(schema.clone(), vec![5, 5, 5]), Rid::new(5, 5));
index.insert(&Tuple::new(schema.clone(), vec![6, 6, 6]), Rid::new(6, 6));
index.insert(&Tuple::new(schema.clone(), vec![7, 7, 7]), Rid::new(7, 7));
index.insert(&Tuple::new(schema.clone(), vec![8, 8, 8]), Rid::new(8, 8));
index.insert(&Tuple::new(schema.clone(), vec![9, 9, 9]), Rid::new(9, 9));
index.insert(
&Tuple::new(schema.clone(), vec![10, 10, 10]),
Rid::new(10, 10),
);
assert_eq!(index.buffer_pool_manager.replacer.size(), 5);
assert_eq!(index.root_page_id, 2);
index.print_tree();

index.delete(&Tuple::new(vec![1, 1, 1]));
index.delete(&Tuple::new(schema.clone(), vec![1, 1, 1]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![1, 1, 1])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![1, 1, 1])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 4);

index.delete(&Tuple::new(vec![3, 3, 3]));
index.delete(&Tuple::new(schema.clone(), vec![3, 3, 3]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![3, 3, 3])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![3, 3, 3])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 4);

index.delete(&Tuple::new(vec![5, 5, 5]));
index.delete(&Tuple::new(schema.clone(), vec![5, 5, 5]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![5, 5, 5])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![5, 5, 5])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 4);

index.delete(&Tuple::new(vec![7, 7, 7]));
index.delete(&Tuple::new(schema.clone(), vec![7, 7, 7]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![7, 7, 7])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![7, 7, 7])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 4);

index.delete(&Tuple::new(vec![9, 9, 9]));
index.delete(&Tuple::new(schema.clone(), vec![9, 9, 9]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![9, 9, 9])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![9, 9, 9])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 3);

index.delete(&Tuple::new(vec![10, 10, 10]));
index.delete(&Tuple::new(schema.clone(), vec![10, 10, 10]));
assert_eq!(index.root_page_id, 2);
assert_eq!(index.get(&Tuple::new(vec![10, 10, 10])), None);
assert_eq!(
index.get(&Tuple::new(schema.clone(), vec![10, 10, 10])),
None
);
assert_eq!(index.buffer_pool_manager.replacer.size(), 3);

index.delete(&Tuple::new(vec![8, 8, 8]));
index.delete(&Tuple::new(schema.clone(), vec![8, 8, 8]));
assert_eq!(index.root_page_id, 0);
assert_eq!(index.get(&Tuple::new(vec![8, 8, 8])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![8, 8, 8])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.delete(&Tuple::new(vec![6, 6, 6]));
index.delete(&Tuple::new(schema.clone(), vec![6, 6, 6]));
assert_eq!(index.root_page_id, 0);
assert_eq!(index.get(&Tuple::new(vec![6, 6, 6])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![6, 6, 6])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.delete(&Tuple::new(vec![4, 4, 4]));
index.delete(&Tuple::new(schema.clone(), vec![4, 4, 4]));
assert_eq!(index.root_page_id, 0);
assert_eq!(index.get(&Tuple::new(vec![4, 4, 4])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![4, 4, 4])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.delete(&Tuple::new(vec![2, 2, 2]));
index.delete(&Tuple::new(schema.clone(), vec![2, 2, 2]));
assert_eq!(index.root_page_id, 0);
assert_eq!(index.get(&Tuple::new(vec![2, 2, 2])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![2, 2, 2])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);

index.delete(&Tuple::new(vec![2, 2, 2]));
index.delete(&Tuple::new(schema.clone(), vec![2, 2, 2]));
assert_eq!(index.root_page_id, 0);
assert_eq!(index.get(&Tuple::new(vec![2, 2, 2])), None);
assert_eq!(index.get(&Tuple::new(schema.clone(), vec![2, 2, 2])), None);
assert_eq!(index.buffer_pool_manager.replacer.size(), 1);
}
}
Loading

0 comments on commit f087158

Please sign in to comment.