Skip to content

Commit

Permalink
Add MustStartSyncing to allow StartSyncing to return errors
Browse files Browse the repository at this point in the history
Add groundwork for persisting databases in JS clients.
  • Loading branch information
kegsay committed Jan 16, 2024
1 parent d70182b commit 8c0ed0b
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 149 deletions.
20 changes: 17 additions & 3 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ type Client interface {
// Remove any persistent storage, if it was enabled.
DeletePersistentStorage(t Test)
Login(t Test, opts ClientCreationOpts) error
// MustStartSyncing to begin syncing from sync v2 / sliding sync.
// Tests should call stopSyncing() at the end of the test.
// MUST BLOCK until the initial sync is complete.
// Fails the test if there was a problem syncing.
MustStartSyncing(t Test) (stopSyncing func())
// StartSyncing to begin syncing from sync v2 / sliding sync.
// Tests should call stopSyncing() at the end of the test.
// MUST BLOCK until the initial sync is complete.
StartSyncing(t Test) (stopSyncing func())
// Returns an error if there was a problem syncing.
StartSyncing(t Test) (stopSyncing func(), err error)
// IsRoomEncrypted returns true if the room is encrypted. May return an error e.g if you
// provide a bogus room ID.
IsRoomEncrypted(t Test, roomID string) (bool, error)
Expand Down Expand Up @@ -76,10 +82,18 @@ func (c *LoggedClient) Close(t Test) {
c.Client.Close(t)
}

func (c *LoggedClient) StartSyncing(t Test) (stopSyncing func()) {
func (c *LoggedClient) MustStartSyncing(t Test) (stopSyncing func()) {
t.Helper()
c.Logf(t, "%s MustStartSyncing starting to sync", c.logPrefix())
stopSyncing = c.Client.MustStartSyncing(t)
c.Logf(t, "%s MustStartSyncing now syncing", c.logPrefix())
return
}

func (c *LoggedClient) StartSyncing(t Test) (stopSyncing func(), err error) {
t.Helper()
c.Logf(t, "%s StartSyncing starting to sync", c.logPrefix())
stopSyncing = c.Client.StartSyncing(t)
stopSyncing, err = c.Client.StartSyncing(t)
c.Logf(t, "%s StartSyncing now syncing", c.logPrefix())
return
}
Expand Down
34 changes: 26 additions & 8 deletions internal/api/js/chrome/chrome.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"io/fs"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -68,15 +70,27 @@ type Browser struct {
Cancel func()
}

func RunHeadless(onConsoleLog func(s string)) (*Browser, error) {
func RunHeadless(onConsoleLog func(s string), requiresPersistance bool, listenPort int) (*Browser, error) {
ansiRedForeground := "\x1b[31m"
ansiResetForeground := "\x1b[39m"

colorifyError := func(format string, args ...any) {
format = ansiRedForeground + time.Now().Format(time.RFC3339) + " " + format + ansiResetForeground
fmt.Printf(format, args...)
}
ctx, cancel := chromedp.NewContext(context.Background(), chromedp.WithBrowserOption(
opts := chromedp.DefaultExecAllocatorOptions[:]
if requiresPersistance {
os.Mkdir("chromedp", os.ModePerm) // ignore errors to allow repeated runs
wd, _ := os.Getwd()
userDir := filepath.Join(wd, "chromedp")
opts = append(opts,
chromedp.UserDataDir(userDir),
)
fmt.Println(userDir)
}

allocCtx, allocCancel := chromedp.NewExecAllocator(context.Background(), opts...)
ctx, cancel := chromedp.NewContext(allocCtx, chromedp.WithBrowserOption(
chromedp.WithBrowserLogf(colorifyError), chromedp.WithBrowserErrorf(colorifyError), //chromedp.WithBrowserDebugf(log.Printf),
))

Expand Down Expand Up @@ -106,11 +120,11 @@ func RunHeadless(onConsoleLog func(s string)) (*Browser, error) {
wg.Add(1)
mux := &http.ServeMux{}
mux.Handle("/", http.FileServer(http.FS(c)))
srv := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", listenPort),
Handler: mux,
}
startServer := func() {
srv := &http.Server{
Addr: "127.0.0.1:0",
Handler: mux,
}
ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
panic(err)
Expand All @@ -133,8 +147,12 @@ func RunHeadless(onConsoleLog func(s string)) (*Browser, error) {
}

return &Browser{
Ctx: ctx,
Cancel: cancel,
Ctx: ctx,
Cancel: func() {
cancel()
allocCancel()
srv.Close()
},
BaseURL: baseJSURL,
}, nil
}
105 changes: 94 additions & 11 deletions internal/api/js/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package js
import (
"encoding/json"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
Expand All @@ -15,6 +17,15 @@ import (

const CONSOLE_LOG_CONTROL_STRING = "CC:" // for "complement-crypto"

const (
indexedDBName = "complement-crypto"
indexedDBCryptoName = "complement-crypto:crypto"
)

// For clients which want persistent storage, we need to ensure when the browser
// starts up a 2nd+ time we serve the same URL so the browser uses the same origin
var userDeviceToPort = map[string]int{}

var logFile *os.File

func SetupJSLogs(filename string) {
Expand Down Expand Up @@ -50,6 +61,7 @@ func NewJSClient(t api.Test, opts api.ClientCreationOpts) (api.Client, error) {
listeners: make(map[int32]func(roomID string, ev api.Event)),
userID: opts.UserID,
}
portKey := opts.UserID + opts.DeviceID
browser, err := chrome.RunHeadless(func(s string) {
// TODO: debug mode only?
writeToLog("[%s,%s] console.log %s\n", jsc.browser.BaseURL, opts.UserID, s)
Expand All @@ -67,24 +79,52 @@ func NewJSClient(t api.Test, opts api.ClientCreationOpts) (api.Client, error) {
l(segs[0], jsToEvent(ev))
}
}
})
}, opts.PersistentStorage, userDeviceToPort[portKey])
if err != nil {
return nil, fmt.Errorf("failed to RunHeadless: %s", err)
}
jsc.browser = browser
/*
chrome.MustRunAsyncFn[chrome.Void](t, browser.Ctx, `
const databases = await indexedDB.databases();
console.log("====STARTUP=============== idb " + JSON.stringify(databases));
console.log("=================== localstorage len", window.localStorage.length);
`) */

// now login
deviceID := "undefined"
if opts.DeviceID != "" {
deviceID = `"` + opts.DeviceID + `"`
}
store := "undefined"
cryptoStore := "undefined"
if opts.PersistentStorage {
store = fmt.Sprintf(`new IndexedDBStore({
indexedDB: window.indexedDB,
dbName: "%s",
localStorage: window.localStorage,
})`, indexedDBName)
cryptoStore = fmt.Sprintf(`new IndexedDBCryptoStore(indexedDB, "%s")`, indexedDBCryptoName)
// remember the port for same-origin to remember the store
u, _ := url.Parse(browser.BaseURL)
portStr := u.Port()
port, err := strconv.Atoi(portStr)
if portStr == "" || err != nil {
api.Fatalf(t, "failed to extract port from base url %s", browser.BaseURL)
}
userDeviceToPort[portKey] = port
t.Logf("user=%s device=%s will be served from %s due to persistent storage", opts.UserID, opts.DeviceID, browser.BaseURL)
}

chrome.MustRunAsyncFn[chrome.Void](t, browser.Ctx, fmt.Sprintf(`
window._secretStorageKeys = {};
window.__client = matrix.createClient({
baseUrl: "%s",
"useAuthorizationHeader": %s,
"userId": "%s",
"deviceId": %s,
useAuthorizationHeader: %s,
userId: "%s",
deviceId: %s,
store: %s,
cryptoStore: %s,
cryptoCallbacks: {
cacheSecretStorageKey: (keyId, keyInfo, key) => {
console.log("cacheSecretStorageKey: keyId="+keyId+" keyInfo="+JSON.stringify(keyInfo)+" key.length:"+key.length);
Expand All @@ -109,7 +149,7 @@ func NewJSClient(t api.Test, opts api.ClientCreationOpts) (api.Client, error) {
return Promise.resolve(result);
},
}
});`, opts.BaseURL, "true", opts.UserID, deviceID))
});`, opts.BaseURL, "true", opts.UserID, deviceID, store, cryptoStore))

return &api.LoggedClient{Client: jsc}, nil
}
Expand Down Expand Up @@ -142,14 +182,45 @@ func (c *JSClient) Login(t api.Test, opts api.ClientCreationOpts) error {

func (c *JSClient) DeletePersistentStorage(t api.Test) {
t.Helper()
// TODO
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`
window.localStorage.clear();
window.sessionStorage.clear();
const dbName = "%s";
await new Promise((resolve, reject) => {
const req = window.indexedDB.deleteDatabase(dbName);
req.onerror = (event) => {
reject("failed to delete " + dbName + ": " + event);
};
req.onsuccess = (event) => {
console.log(dbName + " deleted successfully");
resolve();
};
});
const cryptoDBName = "%s";
await new Promise((resolve, reject) => {
const req = window.indexedDB.deleteDatabase(cryptoDBName);
req.onerror = (event) => {
reject("failed to delete " + cryptoDBName + ": " + event);
};
req.onsuccess = (event) => {
console.log(cryptoDBName + " deleted successfully");
resolve();
};
});
`, indexedDBName, indexedDBCryptoName))
}

// Close is called to clean up resources.
// Specifically, we need to shut off existing browsers and any FFI bindings.
// If we get callbacks/events after this point, tests may panic if the callbacks
// log messages.
func (c *JSClient) Close(t api.Test) {
/*
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, `
const databases = await indexedDB.databases();
console.log("====CLOSE======= idb " + JSON.stringify(databases));
console.log("=================== localstorage len", window.localStorage.length);
`) */
c.browser.Cancel()
c.listeners = make(map[int32]func(roomID string, ev api.Event))
}
Expand Down Expand Up @@ -198,11 +269,18 @@ func (c *JSClient) MustGetEvent(t api.Test, roomID, eventID string) api.Event {
return ev
}

func (c *JSClient) MustStartSyncing(t api.Test) (stopSyncing func()) {
t.Helper()
stopSyncing, err := c.StartSyncing(t)
api.MustNotError(t, "StartSyncing", err)
return stopSyncing
}

// StartSyncing to begin syncing from sync v2 / sliding sync.
// Tests should call stopSyncing() at the end of the test.
func (c *JSClient) StartSyncing(t api.Test) (stopSyncing func()) {
func (c *JSClient) StartSyncing(t api.Test) (stopSyncing func(), err error) {
t.Helper()
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`
_, err = chrome.RunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`
var fn;
fn = function(state) {
if (state !== "SYNCING") {
Expand All @@ -212,6 +290,9 @@ func (c *JSClient) StartSyncing(t api.Test) (stopSyncing func()) {
window.__client.off("sync", fn);
};
window.__client.on("sync", fn);`, CONSOLE_LOG_CONTROL_STRING))
if err != nil {
return nil, fmt.Errorf("[%s]failed to listen for sync callback: %s", c.userID, err)
}
ch := make(chan struct{})
cancel := c.listenForUpdates(func(roomID string, ev api.Event) {
if roomID != "sync" {
Expand All @@ -222,7 +303,7 @@ func (c *JSClient) StartSyncing(t api.Test) (stopSyncing func()) {
chrome.RunAsyncFn[chrome.Void](t, c.browser.Ctx, `await window.__client.startClient({});`)
select {
case <-time.After(5 * time.Second):
api.Fatalf(t, "[%s](js) took >5s to StartSyncing", c.userID)
return nil, fmt.Errorf("[%s](js) took >5s to StartSyncing", c.userID)
case <-ch:
}
cancel()
Expand All @@ -232,7 +313,7 @@ func (c *JSClient) StartSyncing(t api.Test) (stopSyncing func()) {
time.Sleep(500 * time.Millisecond)
return func() {
chrome.RunAsyncFn[chrome.Void](t, c.browser.Ctx, `await window.__client.stopClient();`)
}
}, nil
}

// IsRoomEncrypted returns true if the room is encrypted. May return an error e.g if you
Expand Down Expand Up @@ -327,7 +408,9 @@ func (c *JSClient) WaitUntilEventInRoom(t api.Test, roomID string, checker func(
func (c *JSClient) Logf(t api.Test, format string, args ...interface{}) {
t.Helper()
formatted := fmt.Sprintf(t.Name()+": "+format, args...)
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`console.log("%s");`, formatted))
if c.browser.Ctx.Err() == nil { // don't log on dead browsers
chrome.MustRunAsyncFn[chrome.Void](t, c.browser.Ctx, fmt.Sprintf(`console.log("%s");`, formatted))
}
t.Logf(format, args...)
}

Expand Down
25 changes: 19 additions & 6 deletions internal/api/rust/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,30 @@ func (c *RustClient) MustGetEvent(t api.Test, roomID, eventID string) api.Event
return *ev
}

func (c *RustClient) MustStartSyncing(t api.Test) (stopSyncing func()) {
t.Helper()
stopSyncing, err := c.StartSyncing(t)
api.MustNotError(t, "StartSyncing", err)
return stopSyncing
}

// StartSyncing to begin syncing from sync v2 / sliding sync.
// Tests should call stopSyncing() at the end of the test.
func (c *RustClient) StartSyncing(t api.Test) (stopSyncing func()) {
func (c *RustClient) StartSyncing(t api.Test) (stopSyncing func(), err error) {
t.Helper()
syncService, err := c.FFIClient.SyncService().Finish()
api.MustNotError(t, fmt.Sprintf("[%s]failed to make sync service", c.userID), err)
if err != nil {
return nil, fmt.Errorf("[%s]failed to make sync service: %s", c.userID, err)
}
roomList, err := syncService.RoomListService().AllRooms()
api.MustNotError(t, "failed to call SyncService.RoomListService.AllRooms", err)
if err != nil {
return nil, fmt.Errorf("[%s]failed to call SyncService.RoomListService.AllRooms: %s", c.userID, err)
}
genericListener := newGenericStateListener[matrix_sdk_ffi.RoomListLoadingState]()
result, err := roomList.LoadingState(genericListener)
api.MustNotError(t, "failed to call RoomList.LoadingState", err)
if err != nil {
return nil, fmt.Errorf("[%s]failed to call RoomList.LoadingState: %s", c.userID, err)
}
go syncService.Start()
c.allRooms = roomList

Expand All @@ -139,7 +152,7 @@ func (c *RustClient) StartSyncing(t api.Test) (stopSyncing func()) {
for !isSyncing {
select {
case <-time.After(5 * time.Second):
api.Fatalf(t, "[%s](rust) timed out after 5s StartSyncing", c.userID)
return nil, fmt.Errorf("[%s](rust) timed out after 5s StartSyncing", c.userID)
case state := <-genericListener.ch:
switch state.(type) {
case matrix_sdk_ffi.RoomListLoadingStateLoaded:
Expand All @@ -156,7 +169,7 @@ func (c *RustClient) StartSyncing(t api.Test) (stopSyncing func()) {
return func() {
t.Logf("%s: Stopping sync service", c.userID)
syncService.Stop()
}
}, nil
}

// IsRoomEncrypted returns true if the room is encrypted. May return an error e.g if you
Expand Down
Loading

0 comments on commit 8c0ed0b

Please sign in to comment.