Skip to content

Commit

Permalink
Repo: split RepoEvent::NewDoc into a request and new event
Browse files Browse the repository at this point in the history
Context: the automerge-repo JS implementation supports a request
workflow for syncing with a document we don't have. In this workflow the
requesting peer sends a "request" message which is identical to the
current sync message except tagged with a different message type.
Responding peers can then either respond with a sync message or with an
"unavailable" message, which allows the receiver to tell the difference
between a document the other end doesn't have and a document the other
end has but which is empty.

Problem: in the repo loop we have no way of telling the difference
between a request for a new document and announcing a document we have.
This is because both situations are expressed using the
RepoEvent::NewDoc event.

Solution: split `NewDoc` into a `RequestDoc` and `NewDoc` event. This
allows us to send request messages for `RequestDoc` and sync messages
for `NewDoc`.
  • Loading branch information
alexjg committed Dec 14, 2023
1 parent 78a7612 commit 958c54b
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 154 deletions.
2 changes: 1 addition & 1 deletion examples/distributed_bakery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn main() {
}

// The initial document.
let doc_handle = repo_handle.new_document();
let doc_handle = repo_handle.new_document().await;
doc_handle.with_doc_mut(|doc| {
let mut tx = doc.transaction();
reconcile(&mut tx, &bakery).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn request_doc(State(state): State<Arc<AppState>>, Json(document_id): Json

#[debug_handler]
async fn new_doc(State(state): State<Arc<AppState>>) -> Json<DocumentId> {
let doc_handle = state.repo_handle.new_document();
let doc_handle = state.repo_handle.new_document().await;
let our_id = state.repo_handle.get_repo_id();
doc_handle.with_doc_mut(|doc| {
let mut tx = doc.transaction();
Expand Down
220 changes: 95 additions & 125 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,40 +134,30 @@ impl RepoHandle {
}

/// Create a new document.
pub fn new_document(&self) -> DocHandle {
pub fn new_document(&self) -> impl Future<Output = DocHandle> {
let document_id = DocumentId::random();
let document = new_document();
let doc_info = self.new_document_info(document, DocState::Sync(vec![]));
let handle = DocHandle::new(
self.repo_sender.clone(),
document_id.clone(),
doc_info.document.clone(),
doc_info.handle_count.clone(),
self.repo_id.clone(),
);
let (future, resolver) = new_repo_future_with_resolver();
self.repo_sender
.send(RepoEvent::NewDoc(document_id, doc_info))
.send(RepoEvent::NewDoc(
document_id,
SharedDocument {
automerge: document,
},
resolver,
))
.expect("Failed to send repo event.");
// TODO: return a future to make-up for the unboundedness of the channel.
handle
future
}

/// Boostrap a document, first from storage, and if not found over the network.
pub fn request_document(
&self,
document_id: DocumentId,
) -> RepoFuture<Result<Option<DocHandle>, RepoError>> {
let document = new_document();
) -> impl Future<Output = Result<Option<DocHandle>, RepoError>> {
let (fut, resolver) = new_repo_future_with_resolver();
let doc_info = self.new_document_info(
document,
DocState::Bootstrap {
resolvers: vec![resolver],
storage_fut: None,
},
);
self.repo_sender
.send(RepoEvent::NewDoc(document_id, doc_info))
.send(RepoEvent::RequestDoc(document_id, resolver))
.expect("Failed to send repo event.");
fut
}
Expand All @@ -186,15 +176,6 @@ impl RepoHandle {
fut
}

fn new_document_info(&self, document: Automerge, state: DocState) -> DocumentInfo {
let document = SharedDocument {
automerge: document,
};
let document = Arc::new(RwLock::new(document));
let handle_count = Arc::new(AtomicUsize::new(1));
DocumentInfo::new(state, document, handle_count)
}

/// Add a network adapter, representing a connection with a remote repo.
pub fn new_remote_repo(
&self,
Expand All @@ -215,7 +196,12 @@ impl RepoHandle {
/// Events sent by repo or doc handles to the repo.
pub(crate) enum RepoEvent {
/// Start processing a new document.
NewDoc(DocumentId, DocumentInfo),
NewDoc(DocumentId, SharedDocument, RepoFutureResolver<DocHandle>),
/// Request a document we don't have
RequestDoc(
DocumentId,
RepoFutureResolver<Result<Option<DocHandle>, RepoError>>,
),
/// A document changed.
DocChange(DocumentId),
/// A document was closed(all doc handles dropped).
Expand Down Expand Up @@ -246,7 +232,8 @@ pub(crate) enum RepoEvent {
impl fmt::Debug for RepoEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RepoEvent::NewDoc(_, _) => f.write_str("RepoEvent::NewDoc"),
RepoEvent::NewDoc(_, _, _) => f.write_str("RepoEvent::NewDoc"),
RepoEvent::RequestDoc(_, _) => f.write_str("RepoEvent::RequestDoc"),
RepoEvent::DocChange(_) => f.write_str("RepoEvent::DocChange"),
RepoEvent::DocClosed(_) => f.write_str("RepoEvent::DocClosed"),
RepoEvent::AddChangeObserver(_, _, _) => f.write_str("RepoEvent::AddChangeObserver"),
Expand Down Expand Up @@ -367,17 +354,6 @@ impl DocState {
}
}

fn get_bootstrap_resolvers(
&mut self,
) -> Vec<RepoFutureResolver<Result<Option<DocHandle>, RepoError>>> {
match self {
DocState::Bootstrap { resolvers, .. } => mem::take(resolvers),
_ => unreachable!(
"Trying to get boostrap resolvers from a document that cannot have any."
),
}
}

fn resolve_bootstrap_fut(&mut self, doc_handle: Result<DocHandle, RepoError>) {
match self {
DocState::Bootstrap { resolvers, .. } => {
Expand Down Expand Up @@ -468,38 +444,6 @@ impl DocState {
}
}

fn add_boostrap_storage_fut(
&mut self,
fut: BoxFuture<'static, Result<Option<Vec<u8>>, StorageError>>,
) {
match self {
DocState::Bootstrap {
resolvers: _,
ref mut storage_fut,
} => {
assert!(storage_fut.is_none());
*storage_fut = Some(fut);
}
_ => unreachable!(
"Trying to add a boostrap load future for a document that does not need one."
),
}
}

fn add_boostrap_resolvers(
&mut self,
incoming: &mut Vec<RepoFutureResolver<Result<Option<DocHandle>, RepoError>>>,
) {
match self {
DocState::Bootstrap {
ref mut resolvers, ..
} => {
resolvers.append(incoming);
}
_ => unreachable!("Unexpected adding of boostrap resolvers."),
}
}

fn poll_pending_save(&mut self, waker: Arc<RepoWaker>) {
assert!(matches!(*waker, RepoWaker::Storage { .. }));
match self {
Expand Down Expand Up @@ -645,10 +589,6 @@ impl DocumentInfo {
}
}

fn is_boostrapping(&self) -> bool {
self.state.is_bootstrapping()
}

fn start_pending_removal(&mut self) {
self.state = match &mut self.state {
DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => {
Expand Down Expand Up @@ -1330,61 +1270,91 @@ impl Repo {
fn handle_repo_event(&mut self, event: RepoEvent) {
tracing::trace!(event = ?event, "Handling repo event");
match event {
// TODO: simplify handling of `RepoEvent::NewDoc`.
// `NewDoc` could be broken-up into two events: `RequestDoc` and `NewDoc`,
// the doc info could be created here.
RepoEvent::NewDoc(document_id, mut info) => {
if info.is_boostrapping() {
tracing::trace!("adding bootstrapping document");
if let Some(existing_info) = self.documents.get_mut(&document_id) {
if matches!(existing_info.state, DocState::Bootstrap { .. }) {
let mut resolvers = info.state.get_bootstrap_resolvers();
existing_info.state.add_boostrap_resolvers(&mut resolvers);
} else if matches!(existing_info.state, DocState::Sync(_)) {
existing_info.handle_count.fetch_add(1, Ordering::SeqCst);
let handle = DocHandle::new(
self.repo_sender.clone(),
document_id.clone(),
existing_info.document.clone(),
existing_info.handle_count.clone(),
self.repo_id.clone(),
);
info.state.resolve_bootstrap_fut(Ok(handle));
} else {
tracing::warn!(state=?info.state, "newdoc event received for existing document with incorrect state");
info.state.resolve_bootstrap_fut(Err(RepoError::Incorrect(format!("newdoc event received for existing document with incorrect state: {:?}", info.state))));
}
return;
} else {
RepoEvent::NewDoc(document_id, document, mut resolver) => {
assert!(
self.documents.get(&document_id).is_none(),
"NewDoc event should be sent with a fresh document ID and only be sent once"
);
let shared = Arc::new(RwLock::new(document));
let handle_count = Arc::new(AtomicUsize::new(1));
let info =
DocumentInfo::new(DocState::Sync(vec![]), shared.clone(), handle_count.clone());
self.documents.insert(document_id.clone(), info);
resolver.resolve_fut(DocHandle::new(
self.repo_sender.clone(),
document_id.clone(),
shared.clone(),
handle_count.clone(),
self.repo_id.clone(),
));
Self::enqueue_share_decisions(
self.remote_repos.keys(),
&mut self.pending_share_decisions,
&mut self.share_decisions_to_poll,
self.share_policy.as_ref(),
document_id,
ShareType::Announce,
);
}
RepoEvent::RequestDoc(document_id, mut resolver) => {
let info = self
.documents
.entry(document_id.clone())
.or_insert_with(|| {
let handle_count = Arc::new(AtomicUsize::new(1));
let storage_fut = self.storage.get(document_id.clone());
info.state.add_boostrap_storage_fut(storage_fut);
let mut info = DocumentInfo::new(
DocState::Bootstrap {
resolvers: vec![],
storage_fut: Some(storage_fut),
},
Arc::new(RwLock::new(SharedDocument {
automerge: new_document(),
})),
handle_count.clone(),
);
info.poll_storage_operation(
document_id.clone(),
&self.wake_sender,
&self.repo_sender,
&self.repo_id,
);

let share_type = if info.is_boostrapping() {
Some(ShareType::Request)
} else if info.state.should_sync() {
Some(ShareType::Announce)
} else {
None
};
if let Some(share_type) = share_type {
Self::enqueue_share_decisions(
self.remote_repos.keys(),
&mut self.pending_share_decisions,
&mut self.share_decisions_to_poll,
self.share_policy.as_ref(),
document_id.clone(),
share_type,
);
}
info
});

match &mut info.state {
DocState::Bootstrap { resolvers, .. } => resolvers.push(resolver),
DocState::Sync(_) => {
info.handle_count.fetch_add(1, Ordering::SeqCst);
let handle = DocHandle::new(
self.repo_sender.clone(),
document_id.clone(),
info.document.clone(),
info.handle_count.clone(),
self.repo_id.clone(),
);
resolver.resolve_fut(Ok(Some(handle)));
}
DocState::LoadPending { resolvers, .. } => resolvers.push(resolver),
DocState::PendingRemoval(_) => resolver.resolve_fut(Ok(None)),
DocState::Error => {
resolver.resolve_fut(Err(RepoError::Incorrect(
"request event called for document which is in error state".to_string(),
)));
}
}
self.documents.insert(document_id, info);

if info.state.is_bootstrapping() {
Self::enqueue_share_decisions(
self.remote_repos.keys(),
&mut self.pending_share_decisions,
&mut self.share_decisions_to_poll,
self.share_policy.as_ref(),
document_id.clone(),
ShareType::Request,
);
}
}
RepoEvent::DocChange(doc_id) => {
// Handle doc changes: sync the document.
Expand Down
2 changes: 1 addition & 1 deletion tests/interop/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn sync_two_repos(port: u16) {
});
tracing::trace!("connected conn1");

let doc_handle_repo1 = repo1_handle.new_document();
let doc_handle_repo1 = repo1_handle.new_document().await;
doc_handle_repo1
.with_doc_mut(|doc| {
doc.transact(|tx| {
Expand Down
4 changes: 2 additions & 2 deletions tests/network/document_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn test_document_changed_over_sync() {
let repo_handle_2_clone = repo_handle_2.clone();

// Create a document for one repo.
let document_handle_1 = repo_handle_1.new_document();
let document_handle_1 = repo_handle_1.new_document().await;

// Spawn a task that awaits the requested doc handle,
// and then edits the document.
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn test_document_changed_locally() {
let expected_repo_id = repo_handle_1.get_repo_id().clone();

// Create a document for the repo.
let doc_handle = repo_handle_1.new_document();
let doc_handle = repo_handle_1.new_document().await;

// spawn a task which edits the document
tokio::spawn({
Expand Down
4 changes: 2 additions & 2 deletions tests/network/document_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn test_list_all() {
let repo_handle = repo.run();

// Create a document for one repo.
let document_handle = repo_handle.new_document();
let document_handle = repo_handle.new_document().await;
let document_id = document_handle.document_id();

// Edit the document.
Expand Down Expand Up @@ -58,7 +58,7 @@ async fn test_list_all_errors_on_shutdown() {
let repo_handle = repo.run();

// Create a document for one repo.
let document_handle = repo_handle.new_document();
let document_handle = repo_handle.new_document().await;
let document_id = document_handle.document_id();

// Edit the document.
Expand Down
4 changes: 2 additions & 2 deletions tests/network/document_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn test_loading_document_found_immediately() {
let repo_handle = repo.run();

// Create a document for one repo.
let document_handle = repo_handle.new_document();
let document_handle = repo_handle.new_document().await;

// Edit the document.
let doc_data = document_handle.with_doc_mut(|doc| {
Expand Down Expand Up @@ -66,7 +66,7 @@ async fn test_loading_document_found_async() {
let repo_handle = repo.run();

// Create a document for one repo.
let document_handle = repo_handle.new_document();
let document_handle = repo_handle.new_document().await;

// Edit the document.
let doc_data = document_handle.with_doc_mut(|doc| {
Expand Down
Loading

0 comments on commit 958c54b

Please sign in to comment.