Skip to content

Commit

Permalink
Write in batches to RocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
w4 committed Jan 17, 2024
1 parent 912b88e commit d5e06c5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ futures = "0.3"
git2 = "0.18.0"
hex = "0.4"
humantime = "2.1"
itertools = "0.12"
rust-ini = "0.20"
nom = "7.1"
md5 = "0.7"
Expand Down
47 changes: 29 additions & 18 deletions src/database/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::{
use anyhow::Context;
use git2::{ErrorCode, Reference, Sort};
use ini::Ini;
use itertools::Itertools;
use rocksdb::WriteBatch;
use time::OffsetDateTime;
use tracing::{error, info, info_span, instrument, warn};

Expand Down Expand Up @@ -212,29 +214,40 @@ fn branch_index_update(
let tree_len = commit_tree.len()?;
let mut seen = false;
let mut i = 0;
for rev in revwalk {
let rev = rev?;
for revs in &revwalk.chunks(250) {
let mut batch = WriteBatch::default();

if let (false, Some(latest_indexed)) = (seen, &latest_indexed) {
if rev.as_bytes() == &*latest_indexed.get().hash {
seen = true;
for rev in revs {
let rev = rev?;

if let (false, Some(latest_indexed)) = (seen, &latest_indexed) {
if rev.as_bytes() == &*latest_indexed.get().hash {
seen = true;
}

continue;
}

continue;
}
seen = true;

seen = true;
if ((i + 1) % 25_000) == 0 {
info!("{} commits ingested", i + 1);
}

if ((i + 1) % 25_000) == 0 {
info!("{} commits ingested", i + 1);
}
let commit = git_repository.find_commit(rev)?;
let author = commit.author();
let committer = commit.committer();

let commit = git_repository.find_commit(rev)?;
let author = commit.author();
let committer = commit.committer();
Commit::new(&commit, &author, &committer).insert(
&commit_tree,
tree_len + i,
&mut batch,
)?;
i += 1;
}

Commit::new(&commit, &author, &committer).insert(&commit_tree, tree_len + i)?;
i += 1;
commit_tree.update_counter(tree_len + i, &mut batch)?;
db.write_without_wal(batch)?;
}

if !seen && !force_reindex {
Expand All @@ -251,8 +264,6 @@ fn branch_index_update(
);
}

commit_tree.update_counter(tree_len + i)?;

Ok(())
}

Expand Down
18 changes: 9 additions & 9 deletions src/database/schema/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{borrow::Cow, ops::Deref, sync::Arc};

use anyhow::Context;
use git2::{Oid, Signature};
use rocksdb::{IteratorMode, ReadOptions};
use rocksdb::{IteratorMode, ReadOptions, WriteBatch};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use time::OffsetDateTime;
use tracing::debug;
Expand Down Expand Up @@ -44,8 +44,8 @@ impl<'a> Commit<'a> {
}
}

pub fn insert(&self, batch: &CommitTree, id: u64) -> anyhow::Result<()> {
batch.insert(id, self)
pub fn insert(&self, tree: &CommitTree, id: u64, tx: &mut WriteBatch) -> anyhow::Result<()> {
tree.insert(id, self, tx)
}
}

Expand All @@ -72,8 +72,8 @@ impl Serialize for CommitHash<'_> {
S: Serializer,
{
match self {
CommitHash::Oid(v) => v.as_bytes().serialize(serializer),
CommitHash::Bytes(v) => v.serialize(serializer),
CommitHash::Oid(v) => serializer.serialize_bytes(v.as_bytes()),
CommitHash::Bytes(v) => serializer.serialize_bytes(v),
}
}
}
Expand Down Expand Up @@ -147,13 +147,13 @@ impl CommitTree {
Ok(())
}

pub fn update_counter(&self, count: u64) -> anyhow::Result<()> {
pub fn update_counter(&self, count: u64, tx: &mut WriteBatch) -> anyhow::Result<()> {
let cf = self
.db
.cf_handle(COMMIT_COUNT_FAMILY)
.context("missing column family")?;

self.db.put_cf(cf, &self.prefix, count.to_be_bytes())?;
tx.put_cf(cf, &self.prefix, count.to_be_bytes());

Ok(())
}
Expand All @@ -173,7 +173,7 @@ impl CommitTree {
Ok(u64::from_be_bytes(out))
}

fn insert(&self, id: u64, commit: &Commit<'_>) -> anyhow::Result<()> {
fn insert(&self, id: u64, commit: &Commit<'_>, tx: &mut WriteBatch) -> anyhow::Result<()> {
let cf = self
.db
.cf_handle(COMMIT_FAMILY)
Expand All @@ -182,7 +182,7 @@ impl CommitTree {
let mut key = self.prefix.to_vec();
key.extend_from_slice(&id.to_be_bytes());

self.db.put_cf(cf, key, bincode::serialize(commit)?)?;
tx.put_cf(cf, key, bincode::serialize(commit)?);

Ok(())
}
Expand Down

0 comments on commit d5e06c5

Please sign in to comment.