diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index c9075a928c1..89dc9c3da55 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -1005,7 +1005,10 @@ impl TimelineItem { #[derive(Clone, uniffi::Enum)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// When the send was first attempted. + created_at: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. @@ -1019,6 +1022,9 @@ pub enum EventSendState { /// while an unrecoverable error will be parked, until the user /// decides to cancel sending it. is_recoverable: bool, + + /// When the send was first attempted. + created_at: Option, }, /// The local event has been sent successfully to the server. @@ -1030,12 +1036,15 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState { use matrix_sdk_ui::timeline::EventSendState::*; match value { - NotSentYet => Self::NotSentYet, - SendingFailed { error, is_recoverable } => { + NotSentYet { created_at } => { + Self::NotSentYet { created_at: created_at.map(|ts| ts.as_secs().into()) } + } + SendingFailed { error, is_recoverable, created_at } => { let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&**error).into(); Self::SendingFailed { is_recoverable: *is_recoverable, error: as_queue_wedge_error.into(), + created_at: created_at.map(|ts| ts.as_secs().into()), } } Sent { event_id } => Self::Sent { event_id: event_id.to_string() }, diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 35bf486a5c5..4233f49df6a 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -808,17 +808,18 @@ impl StateStore for MemoryStore { transaction_id: OwnedTransactionId, kind: QueuedRequestKind, priority: usize, - ) -> Result<(), Self::Error> { + ) -> Result { + let created_at = MilliSecondsSinceUnixEpoch::now(); self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push( QueuedRequest { kind, transaction_id, error: None, priority, - created_at: Some(MilliSecondsSinceUnixEpoch::now()), + created_at: Some(created_at), }, ); - Ok(()) + Ok(created_at) } async fn update_send_queue_request( @@ -908,17 +909,18 @@ impl StateStore for MemoryStore { parent_transaction_id: &TransactionId, own_transaction_id: ChildTransactionId, content: DependentQueuedRequestKind, - ) -> Result<(), Self::Error> { + ) -> Result { + let created_at = MilliSecondsSinceUnixEpoch::now(); self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push( DependentQueuedRequest { kind: content, parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, parent_key: None, - created_at: None, + created_at: Some(created_at), }, ); - Ok(()) + Ok(created_at) } async fn mark_dependent_queued_requests_as_ready( diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 6e34f4fe263..ca1bd205e63 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -35,8 +35,8 @@ use ruma::{ }, serde::Raw, time::SystemTime, - EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, - TransactionId, UserId, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId, + OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, }; use serde::{Deserialize, Serialize}; @@ -361,7 +361,7 @@ pub trait StateStore: AsyncTraitDeps { transaction_id: OwnedTransactionId, request: QueuedRequestKind, priority: usize, - ) -> Result<(), Self::Error>; + ) -> Result; /// Updates a send queue request with the given content, and resets its /// error status. @@ -422,7 +422,7 @@ pub trait StateStore: AsyncTraitDeps { parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, content: DependentQueuedRequestKind, - ) -> Result<(), Self::Error>; + ) -> Result; /// Mark a set of dependent send queue requests as ready, using a key /// identifying the homeserver's response. @@ -659,7 +659,7 @@ impl StateStore for EraseStateStoreError { transaction_id: OwnedTransactionId, content: QueuedRequestKind, priority: usize, - ) -> Result<(), Self::Error> { + ) -> Result { self.0 .save_send_queue_request(room_id, transaction_id, content, priority) .await @@ -712,7 +712,7 @@ impl StateStore for EraseStateStoreError { parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, content: DependentQueuedRequestKind, - ) -> Result<(), Self::Error> { + ) -> Result { self.0 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content) .await diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index ec174a4422b..7f9650f5977 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -37,8 +37,12 @@ use ruma::{ events::{ presence::PresenceEvent, receipt::{Receipt, ReceiptThread, ReceiptType}, - room::member::{ - MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent, + room::{ + create, + member::{ + MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, + SyncRoomMemberEvent, + }, }, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent, @@ -442,7 +446,7 @@ struct PersistedQueuedRequest { /// The time the original message was first attempted to be sent at. #[serde(skip_serializing_if = "Option::is_none")] created_at: Option, - + // Migrated fields: keep these private, they're not used anymore elsewhere in the code base. /// Deprecated (from old format), now replaced with error field. is_wedged: Option, @@ -1370,7 +1374,7 @@ impl_state_store!({ transaction_id: OwnedTransactionId, kind: QueuedRequestKind, priority: usize, - ) -> Result<()> { + ) -> Result { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); let tx = self @@ -1389,7 +1393,7 @@ impl_state_store!({ || Ok(Vec::new()), |val| self.deserialize_value::>(&val), )?; - + let created_at = MilliSecondsSinceUnixEpoch::now(); // Push the new request. prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), @@ -1399,7 +1403,7 @@ impl_state_store!({ is_wedged: None, event: None, priority: Some(priority), - created_at: Some(MilliSecondsSinceUnixEpoch::now()), + created_at: Some(created_at.clone()), }); // Save the new vector into db. @@ -1407,7 +1411,7 @@ impl_state_store!({ tx.await.into_result()?; - Ok(()) + Ok(created_at) } async fn update_send_queue_request( @@ -1570,7 +1574,7 @@ impl_state_store!({ parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, content: DependentQueuedRequestKind, - ) -> Result<()> { + ) -> Result { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); let tx = self.inner.transaction_on_one_with_mode( @@ -1589,13 +1593,14 @@ impl_state_store!({ |val| self.deserialize_value::>(&val), )?; + let created_at = MilliSecondsSinceUnixEpoch::now(); // Push the new request. prev.push(DependentQueuedRequest { kind: content, parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, parent_key: None, - created_at: None, + created_at: Some(created_at.clone()), }); // Save the new vector into db. @@ -1603,7 +1608,7 @@ impl_state_store!({ tx.await.into_result()?; - Ok(()) + Ok(created_at) } async fn update_dependent_queued_request( diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 32d45f440c6..6c940bab89c 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1759,7 +1759,7 @@ impl StateStore for SqliteStateStore { transaction_id: OwnedTransactionId, content: QueuedRequestKind, priority: usize, - ) -> Result<(), Self::Error> { + ) -> Result { let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id); let room_id_value = self.serialize_value(&room_id.to_owned())?; @@ -1769,11 +1769,13 @@ impl StateStore for SqliteStateStore { // it (with encode_key) or encrypt it (through serialize_value). After // all, it carries no personal information, so this is considered fine. + let created_at = MilliSecondsSinceUnixEpoch::now(); + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?; - Ok(()) + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?; + Ok(created_at) }) .await } @@ -1914,7 +1916,7 @@ impl StateStore for SqliteStateStore { parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, content: DependentQueuedRequestKind, - ) -> Result<()> { + ) -> Result { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); let content = self.serialize_json(&content)?; @@ -1922,16 +1924,24 @@ impl StateStore for SqliteStateStore { let parent_txn_id = parent_txn_id.to_string(); let own_txn_id = own_txn_id.to_string(); + let created_at = MilliSecondsSinceUnixEpoch::now(); + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { txn.prepare_cached( r#"INSERT INTO dependent_send_queue_events - (room_id, parent_transaction_id, own_transaction_id, content) - VALUES (?, ?, ?, ?)"#, + (room_id, parent_transaction_id, own_transaction_id, content, created_at) + VALUES (?, ?, ?, ?, ?)"#, )? - .execute((room_id, parent_txn_id, own_txn_id, content))?; - Ok(()) + .execute(( + room_id, + parent_txn_id, + own_txn_id, + content, + created_at_ts, + ))?; + Ok(created_at) }) .await } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 13638ed6230..b1301b7d96a 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -950,7 +950,9 @@ impl TimelineController

{ warn!("We looked for a local item, but it transitioned as remote??"); return false; }; - prev_local_item.with_send_state(EventSendState::NotSentYet) + prev_local_item.with_send_state(EventSendState::NotSentYet { + created_at: prev_local_item.send_handle.clone().map(|h| h.created_at).flatten(), + }) }; // Replace the local-related state (kind) and the content state. @@ -1260,6 +1262,7 @@ impl TimelineController

{ } }; + let created_at = send_handle.created_at.clone(); self.handle_local_event( echo.transaction_id.clone(), TimelineEventKind::Message { content, relations: Default::default() }, @@ -1273,6 +1276,7 @@ impl TimelineController

{ EventSendState::SendingFailed { error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)), is_recoverable: false, + created_at, }, ) .await; @@ -1356,16 +1360,25 @@ impl TimelineController

{ } } - RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => { + RoomSendQueueUpdate::SendError { + transaction_id, + error, + is_recoverable, + created_at, + } => { self.update_event_send_state( &transaction_id, - EventSendState::SendingFailed { error, is_recoverable }, + EventSendState::SendingFailed { error, is_recoverable, created_at }, ) .await; } - RoomSendQueueUpdate::RetryEvent { transaction_id } => { - self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await; + RoomSendQueueUpdate::RetryEvent { transaction_id, created_at } => { + self.update_event_send_state( + &transaction_id, + EventSendState::NotSentYet { created_at }, + ) + .await; } RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index cc01448714b..0ffb2b4b3c0 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1024,7 +1024,9 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let kind: EventTimelineItemKind = match &self.ctx.flow { Flow::Local { txn_id, send_handle } => LocalEventTimelineItem { - send_state: EventSendState::NotSentYet, + send_state: EventSendState::NotSentYet { + created_at: send_handle.clone().map(|h| h.created_at).flatten(), + }, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 2890b6eab8e..de835734c92 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use as_variant::as_variant; use matrix_sdk::{send_queue::SendHandle, Error}; -use ruma::{EventId, OwnedEventId, OwnedTransactionId}; +use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -65,7 +65,10 @@ impl LocalEventTimelineItem { #[derive(Clone, Debug)] pub enum EventSendState { /// The local event has not been sent yet. - NotSentYet, + NotSentYet { + /// When the send was first attempted. + created_at: Option, + }, /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. SendingFailed { @@ -77,6 +80,9 @@ pub enum EventSendState { /// while an unrecoverable error will be parked, until the user /// decides to cancel sending it. is_recoverable: bool, + + /// When the send was first attempted. + created_at: Option, }, /// The local event has been sent successfully to the server. Sent { diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index cd88dfd9b73..b127576af88 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -50,7 +50,7 @@ async fn test_remote_echo_full_trip() { let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value); let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet { .. })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }; @@ -72,7 +72,7 @@ async fn test_remote_echo_full_trip() { .controller .update_event_send_state( &txn_id, - EventSendState::SendingFailed { error, is_recoverable: true }, + EventSendState::SendingFailed { error, is_recoverable: true, created_at: None }, ) .await; @@ -334,7 +334,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet{ .. })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index 1479c3648c3..cb7dd28fefa 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -85,7 +85,7 @@ async fn test_echo() { assert_let!(Some(VectorDiff::PushBack { value: local_echo }) = timeline_stream.next().await); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); assert_let!(TimelineItemContent::Message(msg) = item.content()); assert_let!(MessageType::Text(text) = msg.msgtype()); assert_eq!(text.body, "Hello, World!"); @@ -172,7 +172,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, because the error is a transient one that's recoverable, @@ -253,7 +253,7 @@ async fn test_dedup_by_event_id_late() { let local_echo = assert_next_matches_with_timeout!(timeline_stream, VectorDiff::PushBack { value } => value); let item = local_echo.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); // Timeline: [day-divider, local echo] let day_divider = assert_next_matches_with_timeout!( timeline_stream, VectorDiff::PushFront { value } => value); @@ -318,7 +318,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs index 4ad9e834238..f878124d8ac 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs @@ -206,7 +206,7 @@ async fn test_edit_local_echo() { let internal_id = item.unique_id(); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); @@ -255,7 +255,7 @@ async fn test_edit_local_echo() { assert!(item.is_local_echo()); // The send state has been reset. - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); let edit_message = item.content().as_message().unwrap(); assert_eq!(edit_message.body(), "hello, world"); @@ -778,7 +778,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next().await); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); assert_let!(Some(VectorDiff::PushFront { value: day_divider }) = timeline_stream.next().await); assert!(day_divider.is_day_divider()); @@ -831,7 +831,7 @@ async fn test_edit_local_echo_with_unsupported_content() { assert_let!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next().await); let item = item.as_event().unwrap(); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); // Let's edit the local echo (poll start) with an unsupported type (message). let edit_err = timeline diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs index e11ea3a8283..0886b908ef3 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs @@ -104,7 +104,7 @@ async fn test_send_attachment() { { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); assert_let!(TimelineItemContent::Message(msg) = item.content()); // Body is the caption, because there's both a caption and filename. @@ -125,7 +125,7 @@ async fn test_send_attachment() { Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next() ); assert_let!(TimelineItemContent::Message(msg) = item.content()); - assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet { .. })); assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption"))); // The URI now refers to the final MXC URI. diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index df1319c7066..844ea21204a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -287,7 +287,7 @@ async fn test_redact_message() { assert_let!(Some(VectorDiff::PushBack { value: second }) = timeline_stream.next().await); let second = second.as_event().unwrap(); - assert_matches!(second.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(second.send_state(), Some(EventSendState::NotSentYet { .. })); // We haven't set a route for sending events, so this will fail. assert_let!(Some(VectorDiff::Set { index, value: second }) = timeline_stream.next().await); @@ -345,7 +345,7 @@ async fn test_redact_local_sent_message() { assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next()); let event = item.as_event().unwrap(); assert!(event.is_local_echo()); - assert_matches!(event.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event.send_state(), Some(EventSendState::NotSentYet { .. })); // As well as a day divider. assert_let_timeout!( diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index edc522c5ab0..cdeffc5ab5a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -273,7 +273,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // Local echoes are updated with the failed send state as soon as the error // response has been received. assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await); - let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable } => (error, is_recoverable)); + let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable, .. } => (error, is_recoverable)); // The error is not recoverable. assert!(!is_recoverable); @@ -292,7 +292,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { assert_eq!(initial.len(), 1); assert_eq!(initial[0].content().as_message().unwrap().body(), "wall of text"); assert_let!( - Some(EventSendState::SendingFailed { error, is_recoverable }) = initial[0].send_state() + Some(EventSendState::SendingFailed { error, is_recoverable, .. }) = initial[0].send_state() ); // Same recoverable status as above. @@ -374,7 +374,7 @@ async fn test_clear_with_echoes() { // The message that failed to send. assert_matches!(event_items[1].send_state(), Some(EventSendState::SendingFailed { .. })); // The message that is still pending. - assert_matches!(event_items[2].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_items[2].send_state(), Some(EventSendState::NotSentYet { .. })); // When we clear the timeline now, timeline.clear().await; @@ -385,7 +385,7 @@ async fn test_clear_with_echoes() { assert_eq!(event_items.len(), 2); assert_matches!(event_items[0].send_state(), Some(EventSendState::SendingFailed { .. })); - assert_matches!(event_items[1].send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_items[1].send_state(), Some(EventSendState::NotSentYet { .. })); } #[async_test] diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs index 580bdc92860..7c02510927d 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs @@ -330,7 +330,7 @@ async fn test_send_reply() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { .. })); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); let in_reply_to = reply_message.in_reply_to().unwrap(); @@ -440,7 +440,7 @@ async fn test_send_reply_to_self() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { .. })); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to self"); let in_reply_to = reply_message.in_reply_to().unwrap(); @@ -533,7 +533,7 @@ async fn test_send_reply_to_threaded() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { .. })); let reply_message = reply_item.content().as_message().unwrap(); // The reply should be considered part of the thread. @@ -663,7 +663,7 @@ async fn test_send_reply_with_event_id() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { .. })); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); let in_reply_to = reply_message.in_reply_to().unwrap(); @@ -779,7 +779,7 @@ async fn test_send_reply_with_event_id_that_is_redacted() { let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); - assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet { .. })); let reply_message = reply_item.content().as_message().unwrap(); assert_eq!(reply_message.body(), "Replying to Bob"); let in_reply_to = reply_message.in_reply_to().unwrap(); diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 20d4bed9a78..992c9ba2f7d 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -162,7 +162,7 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, }, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -444,7 +444,7 @@ impl RoomSendQueue { let content = SerializableEventContent::from_raw(content, event_type); - let transaction_id = self.inner.queue.push(content.clone().into()).await?; + let (transaction_id, created_at) = self.inner.queue.push(content.clone().into()).await?; trace!(%transaction_id, "manager sends a raw event to the background task"); self.inner.notifier.notify_one(); @@ -453,6 +453,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: transaction_id.clone(), media_handles: None, + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -562,6 +563,7 @@ impl RoomSendQueue { }; let txn_id = queued_request.transaction_id.clone(); + let txn_created_at = queued_request.created_at.clone(); trace!(txn_id = %txn_id, "received a request to send!"); let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone()); @@ -661,6 +663,7 @@ impl RoomSendQueue { transaction_id: related_txn_id.unwrap_or(txn_id), error, is_recoverable, + created_at: txn_created_at, }); } } @@ -950,10 +953,11 @@ impl QueueStorage { async fn push( &self, request: QueuedRequestKind, - ) -> Result { + ) -> Result<(OwnedTransactionId, MilliSecondsSinceUnixEpoch), RoomSendQueueStorageError> { let transaction_id = TransactionId::new(); - self.store + let created_at = self + .store .lock() .await .client()? @@ -966,7 +970,7 @@ impl QueueStorage { ) .await?; - Ok(transaction_id) + Ok((transaction_id, created_at)) } /// Peeks the next request to be sent, marking it as being sent. @@ -1183,7 +1187,7 @@ impl QueueStorage { upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, - ) -> Result<(), RoomSendQueueStorageError> { + ) -> Result { let guard = self.store.lock().await; let client = guard.client()?; let store = client.store(); @@ -1244,7 +1248,7 @@ impl QueueStorage { }; // Push the dependent request for the event itself. - store + let created_at = store .save_dependent_queued_request( &self.room_id, &upload_file_txn, @@ -1257,7 +1261,7 @@ impl QueueStorage { ) .await?; - Ok(()) + Ok(created_at) } /// Reacts to the given local echo of an event. @@ -1266,7 +1270,8 @@ impl QueueStorage { &self, transaction_id: &TransactionId, key: String, - ) -> Result, RoomSendQueueStorageError> { + ) -> Result, RoomSendQueueStorageError> + { let guard = self.store.lock().await; let client = guard.client()?; let store = client.store(); @@ -1290,7 +1295,7 @@ impl QueueStorage { // Record the dependent request. let reaction_txn_id = ChildTransactionId::new(); - store + let created_at = store .save_dependent_queued_request( &self.room_id, transaction_id, @@ -1299,7 +1304,7 @@ impl QueueStorage { ) .await?; - Ok(Some(reaction_txn_id)) + Ok(Some((reaction_txn_id, created_at))) } /// Returns a list of the local echoes, that is, all the requests that we're @@ -1323,6 +1328,7 @@ impl QueueStorage { room: room.clone(), transaction_id: queued.transaction_id, media_handles: None, + created_at: queued.created_at.clone(), }, send_error: queued.error, }, @@ -1354,6 +1360,7 @@ impl QueueStorage { send_handle: SendReactionHandle { room: room.clone(), transaction_id: dep.own_transaction_id, + created_at: dep.created_at.clone(), }, applies_to: dep.parent_transaction_id, }, @@ -1382,6 +1389,7 @@ impl QueueStorage { upload_thumbnail_txn: thumbnail_info.map(|info| info.txn), upload_file_txn: file_upload, }), + created_at: dep.created_at.clone(), }, send_error: None, }, @@ -1780,12 +1788,18 @@ pub enum RoomSendQueueUpdate { /// while an unrecoverable error will be parked, until the user /// decides to cancel sending it. is_recoverable: bool, + + /// When the request was initially sent. + created_at: Option, }, /// The event has been unwedged and sending is now being retried. RetryEvent { /// Transaction id used to identify this event. transaction_id: OwnedTransactionId, + + /// When the request was initially sent. + created_at: Option, }, /// The event has been sent to the server, and the query returned @@ -1888,6 +1902,9 @@ pub struct SendHandle { /// Additional handles for a media upload. media_handles: Option, + + /// The time that this send handle was first created + pub created_at: Option, } impl SendHandle { @@ -2055,9 +2072,10 @@ impl SendHandle { // Wake up the queue, in case the room was asleep before unwedging the request. room.notifier.notify_one(); - let _ = room - .updates - .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() }); + let _ = room.updates.send(RoomSendQueueUpdate::RetryEvent { + transaction_id: self.transaction_id.clone(), + created_at: self.created_at.clone(), + }); Ok(()) } @@ -2073,7 +2091,7 @@ impl SendHandle { ) -> Result, RoomSendQueueStorageError> { trace!("received an intent to react"); - if let Some(reaction_txn_id) = + if let Some((reaction_txn_id, created_at)) = self.room.inner.queue.react(&self.transaction_id, key.clone()).await? { trace!("successfully queued react"); @@ -2085,6 +2103,7 @@ impl SendHandle { let send_handle = SendReactionHandle { room: self.room.clone(), transaction_id: reaction_txn_id.clone(), + created_at: Some(created_at), }; let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -2113,6 +2132,8 @@ pub struct SendReactionHandle { room: RoomSendQueue, /// The own transaction id for the reaction. transaction_id: ChildTransactionId, + /// When this send request was first created. + created_at: Option, } impl SendReactionHandle { @@ -2138,6 +2159,8 @@ impl SendReactionHandle { room: self.room.clone(), transaction_id: self.transaction_id.clone().into(), media_handles: None, + // This isn't quite right, should this be the newly enqueued cancel event? + created_at: self.created_at.clone(), }; handle.abort().await diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 269f6ca3858..67d96657b88 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -230,7 +230,8 @@ impl RoomSendQueue { ); // Save requests in the queue storage. - self.inner + let created_at = self + .inner .queue .push_media( event_content.clone(), @@ -250,6 +251,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: send_event_txn.clone().into(), media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }), + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 8024a26fb04..539d253e4b2 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -214,7 +214,7 @@ macro_rules! assert_update { // Check the next stream event is a retry event, with optional checks on txn=$txn ($watch:ident => retry { $(txn=$txn:expr)? }) => { assert_let!( - Ok(Ok(RoomSendQueueUpdate::RetryEvent { transaction_id: _txn })) = + Ok(Ok(RoomSendQueueUpdate::RetryEvent { transaction_id: _txn, created_at: _created_at })) = timeout(Duration::from_secs(1), $watch.recv()).await ); @@ -239,7 +239,7 @@ macro_rules! assert_update { // Returns the error for additional checks. ($watch:ident => error { $(recoverable=$recoverable:expr,)? $(txn=$txn:expr)? }) => {{ assert_let!( - Ok(Ok(RoomSendQueueUpdate::SendError { transaction_id: _txn, error, is_recoverable: _is_recoverable })) = + Ok(Ok(RoomSendQueueUpdate::SendError { transaction_id: _txn, error, is_recoverable: _is_recoverable, created_at: _created_at })) = timeout(Duration::from_secs(10), $watch.recv()).await ); diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index 531ca7a1d21..5faaa670694 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -258,7 +258,7 @@ async fn test_stale_local_echo_time_abort_edit() { } assert!(local_echo.is_editable()); - assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(local_echo.send_state(), Some(EventSendState::NotSentYet { .. })); assert_eq!(local_echo.content().as_message().unwrap().body(), "hi!"); let mut has_sender_profile = local_echo.sender_profile().is_ready();