From 469a35568019b2dfec42226477d4795a9566964b Mon Sep 17 00:00:00 2001 From: Alex Good Date: Wed, 20 Nov 2024 17:41:08 +0000 Subject: [PATCH] Add `Repo::peer_state` to find out about the sync state w.r.t a peer and doc Problem: It's often useful to know what the remote state of a document we are synchronizing is. We have a lot of information about this at the automerge level but it's not exposed to users of `Repo`. Solution: Expose automerge sync state information as well as keep track of when we last received and sent messages. --- src/interfaces.rs | 13 ++ src/lib.rs | 2 +- src/repo.rs | 208 ++++++++++++++++++++-------- tests/network/main.rs | 1 + tests/network/peer_state.rs | 262 ++++++++++++++++++++++++++++++++++++ 5 files changed, 429 insertions(+), 57 deletions(-) create mode 100644 tests/network/peer_state.rs diff --git a/src/interfaces.rs b/src/interfaces.rs index 8c74bfd..af3764c 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, str::FromStr, + time::Instant, }; #[derive(Debug, Eq, Hash, PartialEq, Clone)] @@ -193,3 +194,15 @@ pub trait Storage: Send { _full_doc: Vec, ) -> BoxFuture<'static, Result<(), StorageError>>; } + +/// The state of sycnhronization of a document with a remote peer obtained via [`RepoHandle::peer_state`](crate::RepoHandle::peer_state) +pub struct PeerState { + /// When we last received a message from this peer + pub last_received: Option, + /// When we last sent a message to this peer + pub last_sent: Option, + /// The heads of the document when we last sent a message + pub last_sent_heads: Option>, + /// The last heads of the document that the peer said they had + pub last_acked_heads: Option>, +} diff --git a/src/lib.rs b/src/lib.rs index 180c417..155df14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ pub use share_policy::{SharePolicy, SharePolicyError}; pub use crate::dochandle::DocHandle; pub use crate::interfaces::{ - DocumentId, Message, NetworkError, RepoId, RepoMessage, Storage, StorageError, + DocumentId, Message, NetworkError, PeerState, RepoId, RepoMessage, Storage, StorageError, }; pub use crate::network_connect::ConnDirection; pub use crate::repo::{Repo, RepoError, RepoHandle}; diff --git a/src/repo.rs b/src/repo.rs index bfafcb9..50cfec4 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,5 +1,5 @@ use crate::dochandle::{DocHandle, SharedDocument}; -use crate::interfaces::{DocumentId, RepoId}; +use crate::interfaces::{DocumentId, PeerState, RepoId}; use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError}; use crate::share_policy::ShareDecision; use crate::{share_policy, SharePolicy, SharePolicyError}; @@ -21,6 +21,7 @@ use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::{self, JoinHandle}; +use std::time::Instant; use uuid::Uuid; /// Front-end of the repo. @@ -210,11 +211,27 @@ impl RepoHandle { }) .expect("Failed to send repo event."); } + + pub fn peer_state( + &self, + remote_id: RepoId, + document: DocumentId, + ) -> RepoFuture> { + let (fut, resolver) = new_repo_future_with_resolver(); + self.repo_sender + .send(RepoEvent::GetPeerState { + remote_repo_id: remote_id, + document_id: document, + reply: resolver, + }) + .expect("failed to send repo event"); + fut + } } /// Events sent by repo or doc handles to the repo. pub(crate) enum RepoEvent { - /// Start processing a new document. + /// Start processing a ew document. NewDoc(DocumentId, DocumentInfo), /// A document changed. DocChange(DocumentId), @@ -239,6 +256,11 @@ pub(crate) enum RepoEvent { stream: Box>>, sink: Box>, }, + GetPeerState { + remote_repo_id: RepoId, + document_id: DocumentId, + reply: RepoFutureResolver>, + }, /// Stop the repo. Stop, } @@ -253,6 +275,7 @@ impl fmt::Debug for RepoEvent { RepoEvent::LoadDoc(_, _) => f.write_str("RepoEvent::LoadDoc"), RepoEvent::ListAllDocs(_) => f.write_str("RepoEvent::ListAllDocs"), RepoEvent::ConnectRemoteRepo { .. } => f.write_str("RepoEvent::ConnectRemoteRepo"), + RepoEvent::GetPeerState { .. } => f.write_str("RepoEvent::GetPeerState"), RepoEvent::Stop => f.write_str("RepoEvent::Stop"), } } @@ -573,19 +596,32 @@ pub(crate) struct DocumentInfo { last_heads: Vec, } -/// A state machine representing a connection between a remote repo and a particular document #[derive(Debug)] -enum PeerConnection { - /// we've accepted the peer and are syncing with them - Accepted(SyncState), - /// We're waiting for a response from the share policy - PendingAuth { received_messages: Vec }, +struct PeerConnection { + repo_id: RepoId, + last_recv: Option, + last_send: Option, + state: PeerConnectionState, } impl PeerConnection { - fn pending() -> Self { - PeerConnection::PendingAuth { - received_messages: vec![], + fn pending(repo_id: RepoId) -> Self { + Self { + repo_id, + last_recv: None, + last_send: None, + state: PeerConnectionState::PendingAuth { + received_messages: vec![], + }, + } + } + + fn ready(repo_id: RepoId) -> Self { + Self { + repo_id, + last_recv: None, + last_send: None, + state: PeerConnectionState::Accepted(SyncState::new()), } } @@ -594,23 +630,97 @@ impl PeerConnection { doc: &mut Automerge, msg: SyncMessage, ) -> Result<(), automerge::AutomergeError> { - match self { - PeerConnection::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg), - PeerConnection::PendingAuth { received_messages } => { + self.last_recv = Some(Instant::now()); + match &mut self.state { + PeerConnectionState::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg), + PeerConnectionState::PendingAuth { received_messages } => { received_messages.push(msg); Ok(()) } } } + fn generate_first_sync_message( + &mut self, + document: &mut automerge::Automerge, + ) -> Option { + let msg = match &mut self.state { + PeerConnectionState::PendingAuth { received_messages } => { + let mut sync_state = SyncState::new(); + for msg in received_messages.drain(..) { + document + .receive_sync_message(&mut sync_state, msg) + .expect("Failed to receive sync message."); + } + let message = document.generate_sync_message(&mut sync_state); + self.state = PeerConnectionState::Accepted(sync_state); + message + } + PeerConnectionState::Accepted(sync_state) => document.generate_sync_message(sync_state), + }; + if msg.is_some() { + self.last_send = Some(Instant::now()); + } + msg + } + fn generate_sync_message(&mut self, doc: &Automerge) -> Option { - match self { - Self::Accepted(sync_state) => doc.generate_sync_message(sync_state), - Self::PendingAuth { .. } => None, + let msg = match &mut self.state { + PeerConnectionState::Accepted(sync_state) => doc.generate_sync_message(sync_state), + PeerConnectionState::PendingAuth { .. } => None, + }; + if msg.is_some() { + self.last_send = Some(Instant::now()); + } + msg + } + + fn promote_pending_peer(&mut self) -> Option> { + if let PeerConnectionState::PendingAuth { received_messages } = &mut self.state { + let result = std::mem::take(received_messages); + self.state = PeerConnectionState::Accepted(SyncState::new()); + if !result.is_empty() { + self.last_send = Some(Instant::now()); + } + Some(result) + } else { + tracing::warn!(remote=%self.repo_id, "Tried to promote a peer which was not pending authorization"); + None + } + } + + /// Get the state of synchronization with a remote peer and document + fn peer_state(&self) -> PeerState { + let last_sent_heads = match &self.state { + PeerConnectionState::Accepted(sync_state) => Some(sync_state.last_sent_heads.clone()), + PeerConnectionState::PendingAuth { + received_messages: _, + } => None, + }; + let last_acked_heads = match &self.state { + PeerConnectionState::Accepted(sync_state) => Some(sync_state.shared_heads.clone()), + PeerConnectionState::PendingAuth { + received_messages: _, + } => None, + }; + PeerState { + last_received: self.last_recv, + last_sent: self.last_send, + last_sent_heads, + last_acked_heads, } } } +/// A state machine representing a connection between a remote repo and a particular document +#[derive(Debug)] +enum PeerConnectionState { + /// we've accepted the peer and are syncing with them + Accepted(SyncState), + /// We're waiting for a response from the share policy + PendingAuth { received_messages: Vec }, +} + /// A change requested by a peer connection enum PeerConnCommand { /// Request authorization from the share policy @@ -842,7 +952,7 @@ impl DocumentInfo { Entry::Vacant(entry) => { // if this is a new peer, request authorization commands.push(PeerConnCommand::RequestAuth(repo_id.clone())); - entry.insert(PeerConnection::pending()) + entry.insert(PeerConnection::pending(repo_id.clone())) } Entry::Occupied(entry) => entry.into_mut(), }; @@ -861,48 +971,19 @@ impl DocumentInfo { /// /// Returns any messages which the peer sent while we were waiting for authorization fn promote_pending_peer(&mut self, repo_id: &RepoId) -> Option> { - if let Some(PeerConnection::PendingAuth { received_messages }) = - self.peer_connections.remove(repo_id) - { - self.peer_connections - .insert(repo_id.clone(), PeerConnection::Accepted(SyncState::new())); - Some(received_messages) - } else { - tracing::warn!(remote=%repo_id, "Tried to promote a peer which was not pending authorization"); - None - } + self.peer_connections + .get_mut(repo_id) + .map(|c| c.promote_pending_peer()) + .unwrap_or_default() } /// Potentially generate an outgoing sync message. fn generate_first_sync_message(&mut self, repo_id: RepoId) -> Option { - match self.peer_connections.entry(repo_id) { - Entry::Vacant(entry) => { - let mut sync_state = SyncState::new(); - let document = self.document.read(); - let message = document.automerge.generate_sync_message(&mut sync_state); - entry.insert(PeerConnection::Accepted(sync_state)); - message - } - Entry::Occupied(mut entry) => match entry.get_mut() { - PeerConnection::PendingAuth { received_messages } => { - let mut document = self.document.write(); - let mut sync_state = SyncState::new(); - for msg in received_messages.drain(..) { - document - .automerge - .receive_sync_message(&mut sync_state, msg) - .expect("Failed to receive sync message."); - } - let message = document.automerge.generate_sync_message(&mut sync_state); - entry.insert(PeerConnection::Accepted(sync_state)); - message - } - PeerConnection::Accepted(ref mut sync_state) => { - let document = self.document.read(); - document.automerge.generate_sync_message(sync_state) - } - }, - } + let conn = self + .peer_connections + .entry(repo_id.clone()) + .or_insert_with(|| PeerConnection::ready(repo_id)); + conn.generate_first_sync_message(&mut self.document.write().automerge) } /// Generate outgoing sync message for all repos we are syncing with. @@ -916,6 +997,10 @@ impl DocumentInfo { }) .collect() } + + fn get_peer_state(&self, peer: &RepoId) -> Option { + self.peer_connections.get(peer).map(|p| p.peer_state()) + } } /// Signal that the stream or sink on the network adapter is ready to be polled. @@ -1522,6 +1607,17 @@ impl Repo { self.sinks_to_poll.insert(repo_id.clone()); self.streams_to_poll.insert(repo_id); } + RepoEvent::GetPeerState { + remote_repo_id, + document_id, + mut reply, + } => { + reply.resolve_fut( + self.documents + .get(&document_id) + .and_then(|info| info.get_peer_state(&remote_repo_id)), + ); + } RepoEvent::Stop => { // Handled in the main run loop. } diff --git a/tests/network/main.rs b/tests/network/main.rs index 06a4b4a..dee69ca 100644 --- a/tests/network/main.rs +++ b/tests/network/main.rs @@ -13,6 +13,7 @@ mod document_list; mod document_load; mod document_request; mod document_save; +mod peer_state; use test_log::test; diff --git a/tests/network/peer_state.rs b/tests/network/peer_state.rs new file mode 100644 index 0000000..cff2aed --- /dev/null +++ b/tests/network/peer_state.rs @@ -0,0 +1,262 @@ +extern crate test_utils; + +use std::time::Duration; + +use automerge::transaction::Transactable; +use automerge_repo::{DocHandle, DocumentId, Repo, RepoHandle}; +use test_log::test; +use test_utils::storage_utils::AsyncInMemoryStorage; + +use crate::tincans::connect_repos; + +struct Scenario { + repo_handle_1: RepoHandle, + repo_handle_2: RepoHandle, + document_handle_1: DocHandle, + #[allow(dead_code)] + document_handle_2: DocHandle, + document_id: DocumentId, +} + +async fn scenario() -> Scenario { + let storage = AsyncInMemoryStorage::new(Default::default(), false); + let storage2 = AsyncInMemoryStorage::new(Default::default(), false); + + // Create one repo. + let repo = Repo::new(Some("repo1".to_string()), Box::new(storage.clone())); + let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(storage2.clone())); + let repo_handle_1 = repo.run(); + let repo_handle_2 = repo_2.run(); + connect_repos(&repo_handle_1, &repo_handle_2); + + // Create a document for one repo. + let document_handle_1 = repo_handle_1.new_document(); + let document_id = document_handle_1.document_id(); + + // Edit the document. + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "foo", "bar") + .expect("Failed to change the document."); + tx.commit(); + }); + + let document_handle_2 = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_2.request_document(document_handle_1.document_id()), + ) + .await + .expect("timed out waiting to fetch document") + .expect("failed to fetch document"); + + Scenario { + repo_handle_1, + repo_handle_2, + document_handle_1, + document_handle_2, + document_id, + } +} + +#[test(tokio::test)] +async fn test_read_peer_state() { + let Scenario { + repo_handle_1, + repo_handle_2, + document_id, + .. + } = scenario().await; + + let peer_state = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state(repo_handle_2.get_repo_id().clone(), document_id), + ) + .await + .expect("timed out getting peer state"); + assert!(peer_state.is_some()); +} + +#[test(tokio::test)] +async fn test_peer_state_last_send() { + let Scenario { + repo_handle_1, + repo_handle_2, + document_handle_1, + .. + } = scenario().await; + + let peer_state = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state( + repo_handle_2.get_repo_id().clone(), + document_handle_1.document_id(), + ), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + let last_send_before_change = peer_state + .last_sent + .expect("last send before should be some"); + + // Now make a change on doc 1 + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "foo", "baz") + .expect("Failed to change the document."); + tx.commit(); + }); + + let peer_state_after = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state( + repo_handle_2.get_repo_id().clone(), + document_handle_1.document_id(), + ), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + + let last_sent_after_change = peer_state_after + .last_sent + .expect("last send after should be some"); + assert!(last_sent_after_change > last_send_before_change); +} + +#[test(tokio::test)] +async fn test_peer_state_last_recv() { + let Scenario { + repo_handle_1, + repo_handle_2, + document_handle_1, + document_id, + .. + } = scenario().await; + + // Get the peer state on repo 2 + let peer_state = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_2.peer_state(repo_handle_1.get_repo_id().clone(), document_id.clone()), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + let last_recv_before_change = peer_state + .last_received + .expect("last recv before should be some"); + + // Now make a change on repo 1 + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "foo", "baz") + .expect("Failed to change the document."); + tx.commit(); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let peer_state_after = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_2.peer_state(repo_handle_1.get_repo_id().clone(), document_id), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + + let last_received_after_change = peer_state_after + .last_received + .expect("last received after should be some"); + assert!(last_received_after_change > last_recv_before_change); +} + +#[test(tokio::test)] +async fn test_peer_state_last_sent_heads() { + let Scenario { + repo_handle_1, + repo_handle_2, + document_handle_1, + document_id, + .. + } = scenario().await; + + let heads_before = document_handle_1.with_doc(|d| d.get_heads()); + + // Get the peer state on repo 1 + let peer_state = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state( + repo_handle_2.get_repo_id().clone(), + document_handle_1.document_id(), + ), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + let last_sent_heads_before_change = peer_state + .last_sent_heads + .expect("last sent heads before should be some"); + + assert_eq!(heads_before, last_sent_heads_before_change); + + // Now make a change on repo 1 + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "foo", "baz") + .expect("Failed to change the document."); + tx.commit(); + }); + + let heads_after_change = document_handle_1.with_doc(|d| d.get_heads()); + + let peer_state_after = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state(repo_handle_2.get_repo_id().clone(), document_id), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + + let last_sent_heads_after = peer_state_after + .last_sent_heads + .expect("last received after should be some"); + assert_eq!(last_sent_heads_after, heads_after_change); +} + +#[test(tokio::test)] +async fn test_last_acked_heads() { + let Scenario { + repo_handle_1, + repo_handle_2, + document_handle_1, + document_id, + .. + } = scenario().await; + + let heads_before = document_handle_1.with_doc(|d| d.get_heads()); + + // Wait for the sync to run a bit + tokio::time::sleep(Duration::from_millis(100)).await; + + // Get the peer state on repo 1 + let peer_state = tokio::time::timeout( + Duration::from_millis(100), + repo_handle_1.peer_state(repo_handle_2.get_repo_id().clone(), document_id), + ) + .await + .expect("timed out getting peer state") + .expect("peer state was none"); + let last_acked_heads_before_change = peer_state + .last_acked_heads + .expect("last acked heads before should be some"); + + assert_eq!(heads_before, last_acked_heads_before_change); + + // Now make a change on repo 1 + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "foo", "baz") + .expect("Failed to change the document."); + tx.commit(); + }); +}