Skip to content

Commit

Permalink
Merge pull request #134 from matrix-org/poljar/notification-duplicate…
Browse files Browse the repository at this point in the history
…-olm-machine-fix

Add TestNotificationClientDupeOTKUpload
  • Loading branch information
kegsay authored Sep 24, 2024
2 parents 45a7c16 + 2a837f9 commit 529b1da
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 41 deletions.
15 changes: 11 additions & 4 deletions internal/api/rust/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type RustRoomInfo struct {

type RustClient struct {
FFIClient *matrix_sdk_ffi.Client
syncService *matrix_sdk_ffi.SyncService
roomsListener *RoomsListener
allRooms *matrix_sdk_ffi.RoomList
rooms map[string]*RustRoomInfo
Expand All @@ -57,7 +58,7 @@ type RustClient struct {
persistentStoragePath string
opts api.ClientCreationOpts

// for NSE tests
// for push notification tests (single/multi-process)
notifClient *matrix_sdk_ffi.NotificationClient
}

Expand Down Expand Up @@ -132,9 +133,14 @@ func (c *RustClient) Opts() api.ClientCreationOpts {

func (c *RustClient) GetNotification(t ct.TestLike, roomID, eventID string) (*api.Notification, error) {
if c.notifClient == nil {
t.Errorf("RustClient misconfigured. You can only call GetNotification if this is an NSE process. " +
"Ensure opts.EnableCrossProcessRefreshLockProcessName and opts.AccessToken are set!")
return nil, fmt.Errorf("misconfigured rust client")
var err error
c.Logf(t, "creating NotificationClient")
c.notifClient, err = c.FFIClient.NotificationClient(matrix_sdk_ffi.NotificationProcessSetupSingleProcess{
SyncService: c.syncService,
})
if err != nil {
return nil, fmt.Errorf("GetNotification: failed to create NotificationClient: %s", err)
}
}
notifItem, err := c.notifClient.GetNotification(roomID, eventID)
if err != nil {
Expand Down Expand Up @@ -350,6 +356,7 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error)
}
go syncService.Start()
c.allRooms = roomList
c.syncService = syncService
// track new rooms when they are made
allRoomsListener := newGenericStateListener[[]matrix_sdk_ffi.RoomListEntriesUpdate]()
go func() {
Expand Down
66 changes: 29 additions & 37 deletions tests/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tests
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -535,35 +534,42 @@ func TestMultiprocessNSEOlmSessionWedge(t *testing.T) {
})
}

func TestMultiprocessDupeOTKUpload(t *testing.T) {
// Test that the notification client doesn't cause duplicate OTK uploads.
// Regression test for https://github.com/matrix-org/matrix-rust-sdk/issues/1415
//
// This test creates a normal Alice rust client and lets it upload OTKs. It then:
// - hooks into /keys/upload requests and artificially delays them by adding 4s of latency
// - creates a Bob client who sends a message to Alice, consuming 1 OTK in the process
// - immediately calls GetNotification on Bob's event as soon as it 200 OKs, which creates
// a NotificationClient inside the same process.
//
// This means there are 2 sync loops: the main Client and the NotificationClient. Both clients
// will see the OTK count being lowered so both may try to upload a new OTK. Because we are
// delaying upload requests by 4s, this increases the chance of both uploads being in-flight at
// the same time. If they do not synchronise this operation, they will both try to upload
// _different keys_ with the _same_ key ID, which causes synapse to HTTP 400 with:
//
// > One time key signed_curve25519:AAAAAAAAADI already exists
//
// Which will fail the test.
func TestNotificationClientDupeOTKUpload(t *testing.T) {
if !Instance().ShouldTest(api.ClientTypeRust) {
t.Skipf("rust only")
return
}
t.Skipf("skipped until it is no longer flakey")
tc, roomID := createAndJoinRoom(t)

// start the "main" app
alice := tc.MustLoginClient(t, &cc.ClientCreationRequest{
User: tc.Alice,
Opts: api.ClientCreationOpts{
PersistentStorage: true,
EnableCrossProcessRefreshLockProcessName: "main",
PersistentStorage: true,
},
})
stopSyncing := alice.MustStartSyncing(t)
defer stopSyncing()
aliceAccessToken := alice.Opts().AccessToken

// prep nse process
nseAlice := tc.MustCreateClient(t, &cc.ClientCreationRequest{
User: tc.Alice,
Multiprocess: true,
Opts: api.ClientCreationOpts{
PersistentStorage: true,
EnableCrossProcessRefreshLockProcessName: api.ProcessNameNSE,
AccessToken: aliceAccessToken,
},
})

aliceUploadedNewKeys := false
// artificially slow down the HTTP responses, such that we will potentially have 2 in-flight /keys/upload requests
// at once. If the NSE and main apps are talking to each other, they should be using the same key ID + key.
Expand All @@ -588,35 +594,21 @@ func TestMultiprocessDupeOTKUpload(t *testing.T) {
return nil
},
}, func() {
var eventID string
// Bob appears and sends a message, causing Bob to claim one of Alice's OTKs.
// The main app will see this in /sync and then try to upload another OTK, which we will tarpit.
tc.WithClientSyncing(t, &cc.ClientCreationRequest{
User: tc.Bob,
}, func(bob api.Client) {
eventID = bob.SendMessage(t, roomID, "Hello world!")
})
var wg sync.WaitGroup
wg.Add(2)
go func() { // nse process
defer wg.Done()
// wake up NSE process as if it got a push notification. Calling this function
// should cause the NSE process to upload a OTK as it would have seen 1 has been used.
// The NSE and main app must talk to each other to ensure they use the same key.
nseAlice.Logf(t, "GetNotification %s, %s", roomID, eventID)
notif, err := nseAlice.GetNotification(t, roomID, eventID)
eventID := bob.SendMessage(t, roomID, "Hello world!")
// create a NotificationClient in the same process to fetch this "push notification".
// It might make the NotificationClient upload a OTK as it would have seen 1 has been used.
// The NotificationClient and main Client must talk to each other to ensure they use the same key.
alice.Logf(t, "GetNotification %s, %s", roomID, eventID)
notif, err := alice.GetNotification(t, roomID, eventID)
must.NotError(t, "failed to get notification", err)
must.Equal(t, notif.Text, "Hello world!", "failed to decrypt msg body")
must.Equal(t, notif.FailedToDecrypt, false, "FailedToDecrypt but we should be able to decrypt")
}()
go func() { // app process
defer wg.Done()
stopSyncing := alice.MustStartSyncing(t)
// let alice upload new OTK
time.Sleep(5 * time.Second)
stopSyncing()
}()
wg.Wait()
})
})
if !aliceUploadedNewKeys {
t.Errorf("Alice did not upload new OTKs")
Expand Down

0 comments on commit 529b1da

Please sign in to comment.