diff --git a/internal/api/rust/rust.go b/internal/api/rust/rust.go index a712d58..13228fe 100644 --- a/internal/api/rust/rust.go +++ b/internal/api/rust/rust.go @@ -49,6 +49,7 @@ type RustRoomInfo struct { type RustClient struct { FFIClient *matrix_sdk_ffi.Client + syncService *matrix_sdk_ffi.SyncService roomsListener *RoomsListener allRooms *matrix_sdk_ffi.RoomList rooms map[string]*RustRoomInfo @@ -57,7 +58,7 @@ type RustClient struct { persistentStoragePath string opts api.ClientCreationOpts - // for NSE tests + // for push notification tests (single/multi-process) notifClient *matrix_sdk_ffi.NotificationClient } @@ -132,9 +133,14 @@ func (c *RustClient) Opts() api.ClientCreationOpts { func (c *RustClient) GetNotification(t ct.TestLike, roomID, eventID string) (*api.Notification, error) { if c.notifClient == nil { - t.Errorf("RustClient misconfigured. You can only call GetNotification if this is an NSE process. " + - "Ensure opts.EnableCrossProcessRefreshLockProcessName and opts.AccessToken are set!") - return nil, fmt.Errorf("misconfigured rust client") + var err error + c.Logf(t, "creating NotificationClient") + c.notifClient, err = c.FFIClient.NotificationClient(matrix_sdk_ffi.NotificationProcessSetupSingleProcess{ + SyncService: c.syncService, + }) + if err != nil { + return nil, fmt.Errorf("GetNotification: failed to create NotificationClient: %s", err) + } } notifItem, err := c.notifClient.GetNotification(roomID, eventID) if err != nil { @@ -350,6 +356,7 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error) } go syncService.Start() c.allRooms = roomList + c.syncService = syncService // track new rooms when they are made allRoomsListener := newGenericStateListener[[]matrix_sdk_ffi.RoomListEntriesUpdate]() go func() { diff --git a/tests/notification_test.go b/tests/notification_test.go index 2836187..a260ad7 100644 --- a/tests/notification_test.go +++ b/tests/notification_test.go @@ -3,7 +3,6 @@ package tests import ( "fmt" "math/rand" - "sync" "testing" "time" @@ -535,35 +534,42 @@ func TestMultiprocessNSEOlmSessionWedge(t *testing.T) { }) } -func TestMultiprocessDupeOTKUpload(t *testing.T) { +// Test that the notification client doesn't cause duplicate OTK uploads. +// Regression test for https://github.com/matrix-org/matrix-rust-sdk/issues/1415 +// +// This test creates a normal Alice rust client and lets it upload OTKs. It then: +// - hooks into /keys/upload requests and artificially delays them by adding 4s of latency +// - creates a Bob client who sends a message to Alice, consuming 1 OTK in the process +// - immediately calls GetNotification on Bob's event as soon as it 200 OKs, which creates +// a NotificationClient inside the same process. +// +// This means there are 2 sync loops: the main Client and the NotificationClient. Both clients +// will see the OTK count being lowered so both may try to upload a new OTK. Because we are +// delaying upload requests by 4s, this increases the chance of both uploads being in-flight at +// the same time. If they do not synchronise this operation, they will both try to upload +// _different keys_ with the _same_ key ID, which causes synapse to HTTP 400 with: +// +// > One time key signed_curve25519:AAAAAAAAADI already exists +// +// Which will fail the test. +func TestNotificationClientDupeOTKUpload(t *testing.T) { if !Instance().ShouldTest(api.ClientTypeRust) { t.Skipf("rust only") return } - t.Skipf("skipped until it is no longer flakey") tc, roomID := createAndJoinRoom(t) // start the "main" app alice := tc.MustLoginClient(t, &cc.ClientCreationRequest{ User: tc.Alice, Opts: api.ClientCreationOpts{ - PersistentStorage: true, - EnableCrossProcessRefreshLockProcessName: "main", + PersistentStorage: true, }, }) + stopSyncing := alice.MustStartSyncing(t) + defer stopSyncing() aliceAccessToken := alice.Opts().AccessToken - // prep nse process - nseAlice := tc.MustCreateClient(t, &cc.ClientCreationRequest{ - User: tc.Alice, - Multiprocess: true, - Opts: api.ClientCreationOpts{ - PersistentStorage: true, - EnableCrossProcessRefreshLockProcessName: api.ProcessNameNSE, - AccessToken: aliceAccessToken, - }, - }) - aliceUploadedNewKeys := false // artificially slow down the HTTP responses, such that we will potentially have 2 in-flight /keys/upload requests // at once. If the NSE and main apps are talking to each other, they should be using the same key ID + key. @@ -588,35 +594,21 @@ func TestMultiprocessDupeOTKUpload(t *testing.T) { return nil }, }, func() { - var eventID string // Bob appears and sends a message, causing Bob to claim one of Alice's OTKs. // The main app will see this in /sync and then try to upload another OTK, which we will tarpit. tc.WithClientSyncing(t, &cc.ClientCreationRequest{ User: tc.Bob, }, func(bob api.Client) { - eventID = bob.SendMessage(t, roomID, "Hello world!") - }) - var wg sync.WaitGroup - wg.Add(2) - go func() { // nse process - defer wg.Done() - // wake up NSE process as if it got a push notification. Calling this function - // should cause the NSE process to upload a OTK as it would have seen 1 has been used. - // The NSE and main app must talk to each other to ensure they use the same key. - nseAlice.Logf(t, "GetNotification %s, %s", roomID, eventID) - notif, err := nseAlice.GetNotification(t, roomID, eventID) + eventID := bob.SendMessage(t, roomID, "Hello world!") + // create a NotificationClient in the same process to fetch this "push notification". + // It might make the NotificationClient upload a OTK as it would have seen 1 has been used. + // The NotificationClient and main Client must talk to each other to ensure they use the same key. + alice.Logf(t, "GetNotification %s, %s", roomID, eventID) + notif, err := alice.GetNotification(t, roomID, eventID) must.NotError(t, "failed to get notification", err) must.Equal(t, notif.Text, "Hello world!", "failed to decrypt msg body") must.Equal(t, notif.FailedToDecrypt, false, "FailedToDecrypt but we should be able to decrypt") - }() - go func() { // app process - defer wg.Done() - stopSyncing := alice.MustStartSyncing(t) - // let alice upload new OTK - time.Sleep(5 * time.Second) - stopSyncing() - }() - wg.Wait() + }) }) if !aliceUploadedNewKeys { t.Errorf("Alice did not upload new OTKs")