diff --git a/internal/api/rust/rust.go b/internal/api/rust/rust.go index c6289df..d707822 100644 --- a/internal/api/rust/rust.go +++ b/internal/api/rust/rust.go @@ -30,7 +30,7 @@ func SetupLogs(prefix string) { // log new files matrix_sdk_ffi.SetupTracing(matrix_sdk_ffi.TracingConfiguration{ WriteToStdoutOrSystem: false, - Filter: "debug,hyper=warn,log=warn,eyeball=warn", //,matrix_sdk_ffi=trace,matrix_sdk=trace,matrix_sdk_crypto=trace,matrix_sdk_base=trace,matrix_sdk_ui=trace", + Filter: "debug,hyper=warn,log=warn,eyeball=warn,matrix_sdk_common=trace", //,matrix_sdk_ffi=trace,matrix_sdk=trace,matrix_sdk_crypto=trace,matrix_sdk_base=trace,matrix_sdk_ui=trace", WriteToFiles: &matrix_sdk_ffi.TracingFileConfiguration{ Path: "./logs", FilePrefix: prefix, @@ -253,6 +253,11 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error) // > thread '' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime' // where the stack trace doesn't hit any test code, but does start at a `free_` function. sb := c.FFIClient.SyncService() + if c.opts.EnableCrossProcessRefreshLockProcessName != "" { + sb2 := sb.WithCrossProcessLock(&c.opts.EnableCrossProcessRefreshLockProcessName) + sb.Destroy() + sb = sb2 + } defer sb.Destroy() syncService, err := sb.Finish() if err != nil { diff --git a/internal/deploy/rpc_client.go b/internal/deploy/rpc_client.go index 940a27b..1ecd866 100644 --- a/internal/deploy/rpc_client.go +++ b/internal/deploy/rpc_client.go @@ -17,14 +17,16 @@ import ( // RPCLanguageBindings implements api.LanguageBindings and instead issues RPC calls to a remote server. type RPCLanguageBindings struct { - binaryPath string - clientType api.ClientTypeLang + binaryPath string + clientType api.ClientTypeLang + contextPrefix string } -func NewRPCLanguageBindings(rpcBinaryPath string, clientType api.ClientTypeLang) (*RPCLanguageBindings, error) { +func NewRPCLanguageBindings(rpcBinaryPath string, clientType api.ClientTypeLang, contextPrefix string) (*RPCLanguageBindings, error) { return &RPCLanguageBindings{ - binaryPath: rpcBinaryPath, - clientType: clientType, + binaryPath: rpcBinaryPath, + clientType: clientType, + contextPrefix: contextPrefix, }, nil } @@ -46,7 +48,7 @@ func (r *RPCLanguageBindings) PostTestRun(contextID string) { // - IPC via stdout fails (used to extract the random high numbered port) // - the client cannot talk to the rpc server func (r *RPCLanguageBindings) MustCreateClient(t ct.TestLike, cfg api.ClientCreationOpts) api.Client { - contextID := fmt.Sprintf("%s_%s", strings.Replace(cfg.UserID[1:], ":", "_", -1), cfg.DeviceID) + contextID := fmt.Sprintf("%s%s_%s", r.contextPrefix, strings.Replace(cfg.UserID[1:], ":", "_", -1), cfg.DeviceID) // security: check it is a file not a random bash script... if _, err := os.Stat(r.binaryPath); err != nil { ct.Fatalf(t, "%s: RPC binary at %s does not exist or cannot be executed/read: %s", contextID, r.binaryPath, err) diff --git a/internal/tests/client_test.go b/internal/tests/client_test.go index 29cf941..9344ccf 100644 --- a/internal/tests/client_test.go +++ b/internal/tests/client_test.go @@ -60,7 +60,7 @@ func TestMain(m *testing.M) { rpcBinary := os.Getenv("COMPLEMENT_CRYPTO_RPC_BINARY") if rpcBinary != "" { clientFactories = append(clientFactories, func(t *testing.T, cfg api.ClientCreationOpts) api.Client { - remoteBindings, err := deploy.NewRPCLanguageBindings(rpcBinary, api.ClientTypeRust) + remoteBindings, err := deploy.NewRPCLanguageBindings(rpcBinary, api.ClientTypeRust, "") if err != nil { log.Fatal(err) } diff --git a/tests/main_test.go b/tests/main_test.go index 1d51a56..60b5acb 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "testing" "github.com/matrix-org/complement" @@ -131,10 +132,19 @@ func WithCrossProcessLock(processName string) func(*api.ClientCreationOpts) { } } +// WithAccessToken is an option which can be provided to MustCreateClient which will configure an access token for the client. +// No-ops on non-rust clients, for now. In theory this option should be generic to configure an already logged in client. TODO +func WithAccessToken(accessToken string) func(*api.ClientCreationOpts) { + return func(o *api.ClientCreationOpts) { + o.AccessToken = accessToken + } +} + // TestContext provides a consistent set of variables which most tests will need access to. type TestContext struct { Deployment *deploy.SlidingSyncDeployment RPCBinaryPath string + RPCInstance atomic.Int32 // Alice is defined if at least 1 clientType is provided to CreateTestContext. Alice *client.CSAPI AliceClientType api.ClientType @@ -205,7 +215,8 @@ func (c *TestContext) MustCreateMultiprocessClient(t *testing.T, lang api.Client t.Skipf("RPC binary path not provided, skipping multiprocess test. To run this test, set COMPLEMENT_CRYPTO_RPC_BINARY") return nil } - remoteBindings, err := deploy.NewRPCLanguageBindings(c.RPCBinaryPath, lang) + ctxPrefix := fmt.Sprintf("%d", c.RPCInstance.Add(1)) + remoteBindings, err := deploy.NewRPCLanguageBindings(c.RPCBinaryPath, lang, ctxPrefix) if err != nil { t.Fatalf("Failed to create new RPC language bindings: %s", err) } diff --git a/tests/notification_test.go b/tests/notification_test.go index d09a140..3c4a75d 100644 --- a/tests/notification_test.go +++ b/tests/notification_test.go @@ -2,10 +2,13 @@ package tests import ( "fmt" + "math/rand" + "sync" "testing" "time" "github.com/matrix-org/complement-crypto/internal/api" + "github.com/matrix-org/complement-crypto/internal/deploy" "github.com/matrix-org/complement/must" ) @@ -65,7 +68,7 @@ func testNSEReceive(t *testing.T, numMsgsBefore, numMsgsAfter int) { tc, roomID := createAndJoinRoom(t) // login as Alice (uploads OTKs/device keys) and remember the access token for NSE - alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage()) + alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) alice.Logf(t, "syncing and sending dummy message to ensure e2ee keys are uploaded") stopSyncing := alice.MustStartSyncing(t) alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasMembership(tc.Bob.UserID, "join")).Waitf(t, 5*time.Second, "did not see bob's join") @@ -84,7 +87,7 @@ func testNSEReceive(t *testing.T, numMsgsBefore, numMsgsAfter int) { opts := tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, WithPersistentStorage()) opts.EnableCrossProcessRefreshLockProcessName = api.ProcessNameNSE opts.AccessToken = accessToken - client := MustCreateClient(t, tc.AliceClientType, opts) // this should login already as we provided an access token + client := tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, opts) // this should login already as we provided an access token defer client.Close(t) // we don't sync in the NSE process, just call GetNotification notif, err := client.GetNotification(t, roomID, pushNotifEventID) @@ -131,6 +134,374 @@ func TestNSEReceiveForNonPreKeyMessage(t *testing.T) { }) } +// Get an encrypted room set up with keys exchanged, then concurrently receive messages and see if we end up with a wedged +// session. We should see "Crypto store generation mismatch" log lines in rust SDK. +func TestMultiprocessNSE(t *testing.T) { + if !ShouldTest(api.ClientTypeRust) { + t.Skipf("rust only") + return + } + t.Skipf("TODO: skipped until backup bug is fixed") + numPreBackgroundMsgs := 1 + numPostNSEMsgs := 300 + tc, roomID := createAndJoinRoom(t) + // Alice starts syncing to get an encrypted room set up + alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + stopSyncing := alice.MustStartSyncing(t) + accessToken := alice.Opts().AccessToken + recoveryKey := alice.MustBackupKeys(t) + var eventTimeline []string + // Bob sends a message to alice + tc.WithClientSyncing(t, tc.BobClientType, tc.Bob, func(bob api.Client) { + // let bob realise alice exists and claims keys + time.Sleep(time.Second) + for i := 0; i < numPreBackgroundMsgs; i++ { + msg := fmt.Sprintf("numPreBackgroundMsgs %d", i) + bob.SendMessage(t, roomID, msg) + alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(msg)).Waitf(t, 5*time.Second, "alice did not see '%s'", msg) + } + + stopAliceSyncing := func() { + if alice == nil { + t.Fatalf("stopAliceSyncing: alice was already not syncing") + } + alice.Close(t) + stopSyncing() + alice = nil + } + startAliceSyncing := func() { + if alice != nil { + t.Fatalf("startAliceSyncing: alice was already syncing") + } + alice = MustCreateClient(t, tc.AliceClientType, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock("main"), + )) // this should login already as we provided an access token + stopSyncing = alice.MustStartSyncing(t) + } + checkNSECanDecryptEvent := func(nseAlice api.Client, roomID, eventID, msg string) { + notif, err := nseAlice.GetNotification(t, roomID, eventID) + must.NotError(t, fmt.Sprintf("failed to get notification for event %s '%s'", eventID, msg), err) + must.Equal(t, notif.Text, msg, fmt.Sprintf("NSE failed to decrypt event %s '%s' => %+v", eventID, msg, notif)) + } + + // set up the nse process. It doesn't actively keep a sync loop so we don't need to do the close dance with it. + nseAlice := tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock(api.ProcessNameNSE), + )) // this should login already as we provided an access token + + randomSource := rand.NewSource(2) // static seed for determinism + + // now bob will send lots of messages + for i := 0; i < numPostNSEMsgs; i++ { + if t.Failed() { + t.Logf("bailing at iteration %d", i) + break + } + // we want to emulate the handover of the lock between NSE and the App process. + // For this to happen, we need decryption failures to happen on both processes. + // If we always keep the main App process syncing, we will never see decryption failures on the NSE process. + // We want to randomise this for maximum effect. + restartAlice := randomSource.Int63()%2 == 0 + restartNSE := randomSource.Int63()%2 == 0 + nseOpensFirst := randomSource.Int63()%2 == 0 + aliceSendsMsg := randomSource.Int63()%2 == 0 + t.Logf("iteration %d restart app=%v nse=%v nse_open_first=%v alice_sends=%v", i, restartAlice, restartNSE, nseOpensFirst, aliceSendsMsg) + if restartAlice { + stopAliceSyncing() + } + if restartNSE { + nseAlice.Close(t) + } + msg := fmt.Sprintf("numPostNSEMsgs %d", i) + eventID := bob.SendMessage(t, roomID, msg) + eventTimeline = append(eventTimeline, eventID) + t.Logf("event %s => '%s'", eventID, msg) + if restartNSE { // a new NSE process is created as a result of bob's message + nseAlice = tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock(api.ProcessNameNSE), + )) + } // else we reuse the same NSE process for bob's message + + // both the nse process and the app process should be able to decrypt the event + if nseOpensFirst { + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + } + if restartAlice { + t.Logf("restarting alice") + startAliceSyncing() + } + if aliceSendsMsg { // this will cause the main app to update the crypto store + sentEventID := alice.SendMessage(t, roomID, "dummy") + eventTimeline = append(eventTimeline, sentEventID) + } + if !nseOpensFirst { + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + } + + alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(msg)).Waitf(t, 5*time.Second, "alice did not decrypt '%s'", msg) + } + + // let keys be backed up + time.Sleep(time.Second) + nseAlice.Close(t) + stopAliceSyncing() + }) + + // do a new login to alice and use the recovery key + newDevice := tc.MustRegisterNewDevice(t, tc.Alice, tc.AliceClientType.HS, "RESTORE") + alice2 := tc.MustLoginClient(t, newDevice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + alice2.MustLoadBackup(t, recoveryKey) + stopSyncing = alice2.MustStartSyncing(t) + defer stopSyncing() + // scrollback all the messages and check we can read them + alice2.MustBackpaginate(t, roomID, len(eventTimeline)) + time.Sleep(time.Second) + for _, eventID := range eventTimeline { + ev := alice2.MustGetEvent(t, roomID, eventID) + must.Equal(t, ev.FailedToDecrypt, false, fmt.Sprintf("failed to decrypt event ID %s : %+v", eventID, ev)) + } +} + +func TestMultiprocessNSEBackupKeyMacError(t *testing.T) { + if !ShouldTest(api.ClientTypeRust) { + t.Skipf("rust only") + return + } + tc, roomID := createAndJoinRoom(t) + // Alice starts syncing to get an encrypted room set up + alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + stopSyncing := alice.MustStartSyncing(t) + accessToken := alice.Opts().AccessToken + recoveryKey := alice.MustBackupKeys(t) + var eventTimeline []string + + // Bob sends a message to alice + tc.WithClientSyncing(t, tc.BobClientType, tc.Bob, func(bob api.Client) { + // let bob realise alice exists and claims keys + time.Sleep(time.Second) + + stopAliceSyncing := func() { + if alice == nil { + t.Fatalf("stopAliceSyncing: alice was already not syncing") + } + alice.Close(t) + stopSyncing() + alice = nil + } + startAliceSyncing := func() { + if alice != nil { + t.Fatalf("startAliceSyncing: alice was already syncing") + } + alice = MustCreateClient(t, tc.AliceClientType, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock("main"), + )) // this should login already as we provided an access token + stopSyncing = alice.MustStartSyncing(t) + } + checkNSECanDecryptEvent := func(nseAlice api.Client, roomID, eventID, msg string) { + notif, err := nseAlice.GetNotification(t, roomID, eventID) + must.NotError(t, fmt.Sprintf("failed to get notification for event %s '%s'", eventID, msg), err) + must.Equal(t, notif.Text, msg, fmt.Sprintf("NSE failed to decrypt event %s '%s' => %+v", eventID, msg, notif)) + } + + // set up the nse process. It doesn't actively keep a sync loop so we don't need to do the close dance with it. + nseAlice := tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock(api.ProcessNameNSE), + )) // this should login already as we provided an access token + + msg := "first message" + eventID := bob.SendMessage(t, roomID, msg) + eventTimeline = append(eventTimeline, eventID) + t.Logf("first event %s => '%s'", eventID, msg) + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(msg)).Waitf(t, 5*time.Second, "alice did not decrypt '%s'", msg) + + // restart alice but keep nse process around + stopAliceSyncing() + + // send final message + msg = "final message" + eventID = bob.SendMessage(t, roomID, msg) + eventTimeline = append(eventTimeline, eventID) + t.Logf("final event %s => '%s'", eventID, msg) + + // both the nse process and the app process should be able to decrypt the event + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + + t.Logf("restarting alice") + startAliceSyncing() + alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(msg)).Waitf(t, 5*time.Second, "alice did not decrypt '%s'", msg) + + // let keys be backed up + time.Sleep(time.Second) + nseAlice.Close(t) + stopAliceSyncing() + }) + + // do a new login to alice and use the recovery key + newDevice := tc.MustRegisterNewDevice(t, tc.Alice, tc.AliceClientType.HS, "RESTORE") + alice2 := tc.MustLoginClient(t, newDevice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + alice2.MustLoadBackup(t, recoveryKey) + stopSyncing = alice2.MustStartSyncing(t) + defer stopSyncing() + // scrollback all the messages and check we can read them + alice2.MustBackpaginate(t, roomID, len(eventTimeline)) + time.Sleep(time.Second) + for _, eventID := range eventTimeline { + ev := alice2.MustGetEvent(t, roomID, eventID) + must.Equal(t, ev.FailedToDecrypt, false, fmt.Sprintf("failed to decrypt event using key from backup event ID %s : %+v", eventID, ev)) + } +} + +func TestMultiprocessNSEOlmSessionWedge(t *testing.T) { + if !ShouldTest(api.ClientTypeRust) { + t.Skipf("rust only") + return + } + tc, roomID := createAndJoinRoom(t) + // Alice starts syncing to get an encrypted room set up + alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + stopSyncing := alice.MustStartSyncing(t) + accessToken := alice.Opts().AccessToken + // Bob sends a message to alice + tc.WithClientSyncing(t, tc.BobClientType, tc.Bob, func(bob api.Client) { + // let bob realise alice exists and claims keys + time.Sleep(time.Second) + msg := "pre message" + bob.SendMessage(t, roomID, msg) + alice.WaitUntilEventInRoom(t, roomID, api.CheckEventHasBody(msg)).Waitf(t, 5*time.Second, "alice did not see '%s'", msg) + + stopAliceSyncing := func() { + t.Helper() + if alice == nil { + t.Fatalf("stopAliceSyncing: alice was already not syncing") + } + alice.Close(t) + stopSyncing() + alice = nil + } + startAliceSyncing := func() { + t.Helper() + if alice != nil { + t.Fatalf("startAliceSyncing: alice was already syncing") + } + alice = MustCreateClient(t, tc.AliceClientType, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock("main"), + )) // this should login already as we provided an access token + stopSyncing = alice.MustStartSyncing(t) + } + checkNSECanDecryptEvent := func(nseAlice api.Client, roomID, eventID, msg string) { + t.Helper() + notif, err := nseAlice.GetNotification(t, roomID, eventID) + must.NotError(t, fmt.Sprintf("failed to get notification for event %s '%s'", eventID, msg), err) + must.Equal(t, notif.Text, msg, fmt.Sprintf("NSE failed to decrypt event %s '%s' => %+v", eventID, msg, notif)) + t.Logf("notif %+v", notif) + } + + // set up the nse process. It doesn't actively keep a sync loop so we don't need to do the close dance with it. + // Note we do not restart the NSE process in this test. This matches reality where the NSE process is often used + // to process multiple push notifs one after the other. + nseAlice := tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(accessToken), WithCrossProcessLock(api.ProcessNameNSE), + )) // this should login already as we provided an access token + + stopAliceSyncing() + msg = fmt.Sprintf("test message %d", 1) + eventID := bob.SendMessage(t, roomID, msg) + t.Logf("event %s => '%s'", eventID, msg) + + // both the nse process and the app process should be able to decrypt the event. + // NSE goes first (as it's the push notif process) + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + t.Logf("restarting alice") + nseAlice.Logf(t, "post checkNSECanDecryptEvent") + startAliceSyncing() + alice.SendMessage(t, roomID, "dummy") + + // iteration 2 + stopAliceSyncing() + msg = fmt.Sprintf("test message %d", 2) + eventID = bob.SendMessage(t, roomID, msg) + t.Logf("event %s => '%s'", eventID, msg) + + // both the nse process and the app process should be able to decrypt the event. + // NSE goes first (as it's the push notif process) + checkNSECanDecryptEvent(nseAlice, roomID, eventID, msg) + t.Logf("restarting alice") + startAliceSyncing() + alice.SendMessage(t, roomID, "dummy") + + nseAlice.Close(t) + stopAliceSyncing() + }) +} + +func TestMultiprocessDupeOTKUpload(t *testing.T) { + if !ShouldTest(api.ClientTypeRust) { + t.Skipf("rust only") + return + } + tc, roomID := createAndJoinRoom(t) + + // start the "main" app + alice := tc.MustLoginClient(t, tc.Alice, tc.AliceClientType, WithPersistentStorage(), WithCrossProcessLock("main")) + aliceAccessToken := alice.Opts().AccessToken + + // prep nse process + nseAlice := tc.MustCreateMultiprocessClient(t, tc.AliceClientType.Lang, tc.ClientCreationOpts(t, tc.Alice, tc.AliceClientType.HS, + WithPersistentStorage(), WithAccessToken(aliceAccessToken), WithCrossProcessLock(api.ProcessNameNSE), + )) + + aliceUploadedNewKeys := false + // artificially slow down the HTTP responses, such that we will potentially have 2 in-flight /keys/upload requests + // at once. If the NSE and main apps are talking to each other, they should be using the same key ID + key. + // If not... well, that's a bug because then the client will forget one of these keys. + tc.Deployment.WithSniffedEndpoint(t, "/keys/upload", func(cd deploy.CallbackData) { + if cd.AccessToken != aliceAccessToken { + return // let bob upload OTKs + } + aliceUploadedNewKeys = true + if cd.ResponseCode != 200 { + // we rely on the homeserver checking and rejecting when the same key ID is used with + // different keys. + t.Errorf("/keys/upload returned an error, duplicate key upload? %+v => %v", cd, string(cd.ResponseBody)) + } + // tarpit the response + t.Logf("tarpitting keys/upload response for 4 seconds") + time.Sleep(4 * time.Second) + }, func() { + var eventID string + // Bob appears and sends a message, causing Bob to claim one of Alice's OTKs. + // The main app will see this in /sync and then try to upload another OTK, which we will tarpit. + tc.WithClientSyncing(t, tc.BobClientType, tc.Bob, func(bob api.Client) { + eventID = bob.SendMessage(t, roomID, "Hello world!") + }) + var wg sync.WaitGroup + wg.Add(2) + go func() { // nse process + defer wg.Done() + // wake up NSE process as if it got a push notification. Calling this function + // should cause the NSE process to upload a OTK as it would have seen 1 has been used. + // The NSE and main app must talk to each other to ensure they use the same key. + nseAlice.Logf(t, "GetNotification %s, %s", roomID, eventID) + notif, err := nseAlice.GetNotification(t, roomID, eventID) + must.NotError(t, "failed to get notification", err) + must.Equal(t, notif.Text, "Hello world!", "failed to decrypt msg body") + must.Equal(t, notif.FailedToDecrypt, false, "FailedToDecrypt but we should be able to decrypt") + }() + go func() { // app process + defer wg.Done() + stopSyncing := alice.MustStartSyncing(t) + // let alice upload new OTK + time.Sleep(5 * time.Second) + stopSyncing() + }() + wg.Wait() + }) + if !aliceUploadedNewKeys { + t.Errorf("Alice did not upload new OTKs") + } +} + func createAndJoinRoom(t *testing.T) (tc *TestContext, roomID string) { t.Helper() clientType := api.ClientType{ @@ -143,6 +514,10 @@ func createAndJoinRoom(t *testing.T) (tc *TestContext, roomID string) { tc.Alice, EncRoomOptions.PresetTrustedPrivateChat(), EncRoomOptions.Invite([]string{tc.Bob.UserID}), + // purposefully low rotation period to force room keys to be updated more frequently. + // Wedged olm sessions can only happen when we send olm messages, which only happens + // when we send new room keys! + EncRoomOptions.RotationPeriodMsgs(1), ) tc.Bob.MustJoinRoom(t, roomID, []string{clientType.HS}) return