Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer state #72

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
str::FromStr,
time::Instant,
};

#[derive(Debug, Eq, Hash, PartialEq, Clone)]
Expand Down Expand Up @@ -193,3 +194,15 @@ pub trait Storage: Send {
_full_doc: Vec<u8>,
) -> BoxFuture<'static, Result<(), StorageError>>;
}

/// The state of sycnhronization of a document with a remote peer obtained via [`RepoHandle::peer_state`](crate::RepoHandle::peer_state)
pub struct PeerState {
/// When we last received a message from this peer
pub last_received: Option<Instant>,
/// When we last sent a message to this peer
pub last_sent: Option<Instant>,
/// The heads of the document when we last sent a message
pub last_sent_heads: Option<Vec<automerge::ChangeHash>>,
/// The last heads of the document that the peer said they had
pub last_acked_heads: Option<Vec<automerge::ChangeHash>>,
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use share_policy::{SharePolicy, SharePolicyError};

pub use crate::dochandle::DocHandle;
pub use crate::interfaces::{
DocumentId, Message, NetworkError, RepoId, RepoMessage, Storage, StorageError,
DocumentId, Message, NetworkError, PeerState, RepoId, RepoMessage, Storage, StorageError,
};
pub use crate::network_connect::ConnDirection;
pub use crate::repo::{Repo, RepoError, RepoHandle};
Expand Down
208 changes: 152 additions & 56 deletions src/repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::dochandle::{DocHandle, SharedDocument};
use crate::interfaces::{DocumentId, RepoId};
use crate::interfaces::{DocumentId, PeerState, RepoId};
use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError};
use crate::share_policy::ShareDecision;
use crate::{share_policy, SharePolicy, SharePolicyError};
Expand All @@ -21,6 +21,7 @@ use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use uuid::Uuid;

/// Front-end of the repo.
Expand Down Expand Up @@ -210,11 +211,27 @@ impl RepoHandle {
})
.expect("Failed to send repo event.");
}

pub fn peer_state(
&self,
remote_id: RepoId,
document: DocumentId,
) -> RepoFuture<Option<PeerState>> {
let (fut, resolver) = new_repo_future_with_resolver();
self.repo_sender
.send(RepoEvent::GetPeerState {
remote_repo_id: remote_id,
document_id: document,
reply: resolver,
})
.expect("failed to send repo event");
fut
}
}

/// Events sent by repo or doc handles to the repo.
pub(crate) enum RepoEvent {
/// Start processing a new document.
/// Start processing a ew document.
NewDoc(DocumentId, DocumentInfo),
/// A document changed.
DocChange(DocumentId),
Expand All @@ -239,6 +256,11 @@ pub(crate) enum RepoEvent {
stream: Box<dyn Send + Unpin + Stream<Item = Result<RepoMessage, NetworkError>>>,
sink: Box<dyn Send + Unpin + Sink<RepoMessage, Error = NetworkError>>,
},
GetPeerState {
remote_repo_id: RepoId,
document_id: DocumentId,
reply: RepoFutureResolver<Option<PeerState>>,
},
/// Stop the repo.
Stop,
}
Expand All @@ -253,6 +275,7 @@ impl fmt::Debug for RepoEvent {
RepoEvent::LoadDoc(_, _) => f.write_str("RepoEvent::LoadDoc"),
RepoEvent::ListAllDocs(_) => f.write_str("RepoEvent::ListAllDocs"),
RepoEvent::ConnectRemoteRepo { .. } => f.write_str("RepoEvent::ConnectRemoteRepo"),
RepoEvent::GetPeerState { .. } => f.write_str("RepoEvent::GetPeerState"),
RepoEvent::Stop => f.write_str("RepoEvent::Stop"),
}
}
Expand Down Expand Up @@ -573,19 +596,32 @@ pub(crate) struct DocumentInfo {
last_heads: Vec<ChangeHash>,
}

/// A state machine representing a connection between a remote repo and a particular document
#[derive(Debug)]
enum PeerConnection {
/// we've accepted the peer and are syncing with them
Accepted(SyncState),
/// We're waiting for a response from the share policy
PendingAuth { received_messages: Vec<SyncMessage> },
struct PeerConnection {
repo_id: RepoId,
last_recv: Option<Instant>,
last_send: Option<Instant>,
state: PeerConnectionState,
}

impl PeerConnection {
fn pending() -> Self {
PeerConnection::PendingAuth {
received_messages: vec![],
fn pending(repo_id: RepoId) -> Self {
Self {
repo_id,
last_recv: None,
last_send: None,
state: PeerConnectionState::PendingAuth {
received_messages: vec![],
},
}
}

fn ready(repo_id: RepoId) -> Self {
Self {
repo_id,
last_recv: None,
last_send: None,
state: PeerConnectionState::Accepted(SyncState::new()),
}
}

Expand All @@ -594,23 +630,97 @@ impl PeerConnection {
doc: &mut Automerge,
msg: SyncMessage,
) -> Result<(), automerge::AutomergeError> {
match self {
PeerConnection::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
PeerConnection::PendingAuth { received_messages } => {
self.last_recv = Some(Instant::now());
match &mut self.state {
PeerConnectionState::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
PeerConnectionState::PendingAuth { received_messages } => {
received_messages.push(msg);
Ok(())
}
}
}

fn generate_first_sync_message(
&mut self,
document: &mut automerge::Automerge,
) -> Option<SyncMessage> {
let msg = match &mut self.state {
PeerConnectionState::PendingAuth { received_messages } => {
let mut sync_state = SyncState::new();
for msg in received_messages.drain(..) {
document
.receive_sync_message(&mut sync_state, msg)
.expect("Failed to receive sync message.");
}
let message = document.generate_sync_message(&mut sync_state);
self.state = PeerConnectionState::Accepted(sync_state);
message
}
PeerConnectionState::Accepted(sync_state) => document.generate_sync_message(sync_state),
};
if msg.is_some() {
self.last_send = Some(Instant::now());
}
msg
}

fn generate_sync_message(&mut self, doc: &Automerge) -> Option<SyncMessage> {
match self {
Self::Accepted(sync_state) => doc.generate_sync_message(sync_state),
Self::PendingAuth { .. } => None,
let msg = match &mut self.state {
PeerConnectionState::Accepted(sync_state) => doc.generate_sync_message(sync_state),
PeerConnectionState::PendingAuth { .. } => None,
};
if msg.is_some() {
self.last_send = Some(Instant::now());
}
msg
}

fn promote_pending_peer(&mut self) -> Option<Vec<SyncMessage>> {
if let PeerConnectionState::PendingAuth { received_messages } = &mut self.state {
let result = std::mem::take(received_messages);
self.state = PeerConnectionState::Accepted(SyncState::new());
if !result.is_empty() {
self.last_send = Some(Instant::now());
}
Some(result)
} else {
tracing::warn!(remote=%self.repo_id, "Tried to promote a peer which was not pending authorization");
None
}
}

/// Get the state of synchronization with a remote peer and document
fn peer_state(&self) -> PeerState {
let last_sent_heads = match &self.state {
PeerConnectionState::Accepted(sync_state) => Some(sync_state.last_sent_heads.clone()),
PeerConnectionState::PendingAuth {
received_messages: _,
} => None,
};
let last_acked_heads = match &self.state {
PeerConnectionState::Accepted(sync_state) => Some(sync_state.shared_heads.clone()),
PeerConnectionState::PendingAuth {
received_messages: _,
} => None,
};
PeerState {
last_received: self.last_recv,
last_sent: self.last_send,
last_sent_heads,
last_acked_heads,
}
}
}

/// A state machine representing a connection between a remote repo and a particular document
#[derive(Debug)]
enum PeerConnectionState {
/// we've accepted the peer and are syncing with them
Accepted(SyncState),
/// We're waiting for a response from the share policy
PendingAuth { received_messages: Vec<SyncMessage> },
}

/// A change requested by a peer connection
enum PeerConnCommand {
/// Request authorization from the share policy
Expand Down Expand Up @@ -842,7 +952,7 @@ impl DocumentInfo {
Entry::Vacant(entry) => {
// if this is a new peer, request authorization
commands.push(PeerConnCommand::RequestAuth(repo_id.clone()));
entry.insert(PeerConnection::pending())
entry.insert(PeerConnection::pending(repo_id.clone()))
}
Entry::Occupied(entry) => entry.into_mut(),
};
Expand All @@ -861,48 +971,19 @@ impl DocumentInfo {
///
/// Returns any messages which the peer sent while we were waiting for authorization
fn promote_pending_peer(&mut self, repo_id: &RepoId) -> Option<Vec<SyncMessage>> {
if let Some(PeerConnection::PendingAuth { received_messages }) =
self.peer_connections.remove(repo_id)
{
self.peer_connections
.insert(repo_id.clone(), PeerConnection::Accepted(SyncState::new()));
Some(received_messages)
} else {
tracing::warn!(remote=%repo_id, "Tried to promote a peer which was not pending authorization");
None
}
self.peer_connections
.get_mut(repo_id)
.map(|c| c.promote_pending_peer())
.unwrap_or_default()
}

/// Potentially generate an outgoing sync message.
fn generate_first_sync_message(&mut self, repo_id: RepoId) -> Option<SyncMessage> {
match self.peer_connections.entry(repo_id) {
Entry::Vacant(entry) => {
let mut sync_state = SyncState::new();
let document = self.document.read();
let message = document.automerge.generate_sync_message(&mut sync_state);
entry.insert(PeerConnection::Accepted(sync_state));
message
}
Entry::Occupied(mut entry) => match entry.get_mut() {
PeerConnection::PendingAuth { received_messages } => {
let mut document = self.document.write();
let mut sync_state = SyncState::new();
for msg in received_messages.drain(..) {
document
.automerge
.receive_sync_message(&mut sync_state, msg)
.expect("Failed to receive sync message.");
}
let message = document.automerge.generate_sync_message(&mut sync_state);
entry.insert(PeerConnection::Accepted(sync_state));
message
}
PeerConnection::Accepted(ref mut sync_state) => {
let document = self.document.read();
document.automerge.generate_sync_message(sync_state)
}
},
}
let conn = self
.peer_connections
.entry(repo_id.clone())
.or_insert_with(|| PeerConnection::ready(repo_id));
conn.generate_first_sync_message(&mut self.document.write().automerge)
}

/// Generate outgoing sync message for all repos we are syncing with.
Expand All @@ -916,6 +997,10 @@ impl DocumentInfo {
})
.collect()
}

fn get_peer_state(&self, peer: &RepoId) -> Option<PeerState> {
self.peer_connections.get(peer).map(|p| p.peer_state())
}
}

/// Signal that the stream or sink on the network adapter is ready to be polled.
Expand Down Expand Up @@ -1522,6 +1607,17 @@ impl Repo {
self.sinks_to_poll.insert(repo_id.clone());
self.streams_to_poll.insert(repo_id);
}
RepoEvent::GetPeerState {
remote_repo_id,
document_id,
mut reply,
} => {
reply.resolve_fut(
self.documents
.get(&document_id)
.and_then(|info| info.get_peer_state(&remote_repo_id)),
);
}
RepoEvent::Stop => {
// Handled in the main run loop.
}
Expand Down
1 change: 1 addition & 0 deletions tests/network/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod document_list;
mod document_load;
mod document_request;
mod document_save;
mod peer_state;

use test_log::test;

Expand Down
Loading
Loading