diff --git a/CHANGELOG.md b/CHANGELOG.md index 29d56b0b0a..4a7ef40fab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## StreamChat ### ⚡ Performance - Use background database observers for `CurrentUserController.currentUser`, `ChatChannelMemberListController.members`, and `ChatMessageController.message` which reduces CPU usage on the main thread [#3357](https://github.com/GetStream/stream-chat-swift/pull/3357) +- Reduce latency of the `BackgroundDatabaseObserver` by reducing thread switching when handling changes [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373) ### 🐞 Fixed - Fix rare crashes when deleting local database content on logout [#3355](https://github.com/GetStream/stream-chat-swift/pull/3355) - Fix rare crashes in `MulticastDelegate` when accessing it concurrently [#3361](https://github.com/GetStream/stream-chat-swift/pull/3361) +- Fix reading the local database state just after the initial write [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373) ### 🔄 Changed - Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352) - Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364) diff --git a/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift b/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift index 9e72f3a53f..8cce4c6373 100644 --- a/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift +++ b/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift @@ -28,37 +28,32 @@ class BackgroundDatabaseObserver { /// When called, notification observers are released var releaseNotificationObservers: (() -> Void)? - private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated, attributes: .concurrent) - private let processingQueue: OperationQueue - + private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated) private var _items: [Item] = [] /// The items that have been fetched and mapped /// /// - Note: Fetches items synchronously if the observer is in the middle of processing a change. var rawItems: [Item] { - // When items are accessed while DB change is being processed in the background, - // we want to return the processing change immediately. - // Example: controller synchronizes which updates DB, but then controller wants the - // updated data while the processing is still in progress. - let state: (isProcessing: Bool, preparedItems: [Item]) = queue.sync { (_isProcessingDatabaseChange, _items) } - if !state.isProcessing { + let state: (needsValidation: Bool, preparedItems: [Item]) = queue.sync { (_rawItemsNeedsValidation, _items) } + if !state.needsValidation { return state.preparedItems } - // Otherwise fetch the state from the DB but also reusing existing state. + // Validate current items and update the _items property for the next access. + // Note that already cached items are reused (items loaded by the initial fetch). var items = [Item]() frc.managedObjectContext.performAndWait { - items = mapItems(changes: nil, reusableItems: state.preparedItems) + items = processItems(nil, reusableItems: state.preparedItems, rawItemsNeedsValidation: false, notify: false) } return items } - private var _isProcessingDatabaseChange = false + private var _rawItemsNeedsValidation = true private var _isInitialized: Bool = false private var isInitialized: Bool { get { queue.sync { _isInitialized } } - set { queue.async(flags: .barrier) { self._isInitialized = newValue } } + set { queue.async { self._isInitialized = newValue } } } deinit { @@ -98,19 +93,13 @@ class BackgroundDatabaseObserver { sectionNameKeyPath: nil, cacheName: nil ) - - let operationQueue = OperationQueue() - operationQueue.underlyingQueue = queue - operationQueue.name = "com.stream.database-observer" - operationQueue.maxConcurrentOperationCount = 1 - processingQueue = operationQueue - changeAggregator.onWillChange = { [weak self] in self?.notifyWillChange() } - changeAggregator.onDidChange = { [weak self] changes in - self?.updateItems(changes: changes) + guard let self else { return } + let reusableItems = queue.sync { self._items } + self.processItems(changes, reusableItems: reusableItems, rawItemsNeedsValidation: false, notify: true) } } @@ -129,89 +118,49 @@ class BackgroundDatabaseObserver { frc.delegate = changeAggregator - /// Start a process to get the items, which will then notify via its blocks. - getInitialItems() + // Start loading initial items and do not call delegates for the initial change + // because the initial state is available even when accessing rawItems before this + // function finishes. There can be a race between initial load and DB write which + // requires validation when rawItems are accessed. + frc.managedObjectContext.perform { + self.processItems(nil, reusableItems: [], rawItemsNeedsValidation: true, notify: true) + } } private func notifyWillChange() { - let setProcessingState: (Bool) -> Void = { [weak self] state in - self?.queue.async(flags: .barrier) { - self?._isProcessingDatabaseChange = state + let setNeedsValidation: (Bool) -> Void = { [weak self] state in + self?.queue.async { + self?._rawItemsNeedsValidation = state } } + setNeedsValidation(true) guard let onWillChange = onWillChange else { - setProcessingState(true) return } DispatchQueue.main.async { - setProcessingState(false) + // Do not allow reading the state directly from FRC when accessing rawItems in will change + setNeedsValidation(false) onWillChange() - setProcessingState(true) + setNeedsValidation(true) } } - private func notifyDidChange(changes: [ListChange], onCompletion: @escaping () -> Void) { + private func notifyDidChange(changes: [ListChange]) { guard let onDidChange = onDidChange else { - onCompletion() return } DispatchQueue.main.async { onDidChange(changes) - onCompletion() - } - } - - private func getInitialItems() { - notifyWillChange() - updateItems(changes: nil) - } - - /// This method will add a new operation to the `processingQueue`, where operations are executed one-by-one. - /// The operation added to the queue will start the process of getting new results for the observer. - private func updateItems(changes: [ListChange]?, completion: (() -> Void)? = nil) { - let operation = AsyncOperation { [weak self] _, done in - guard let self = self else { - done(.continue) - completion?() - return - } - // Operation queue runs on the same `self.queue` - let reusableItems = self._items - self.frc.managedObjectContext.perform { - self.processItems(changes, reusableItems: reusableItems) { - done(.continue) - completion?() - } - } } - - processingQueue.addOperation(operation) } /// This method will process the currently fetched objects, and will notify the listeners. /// When the process is done, it also updates the `_items`, which is the locally cached list of mapped items - /// This method will be called through an operation on `processingQueue`, which will serialize the execution until `onCompletion` is called. - private func processItems(_ changes: [ListChange]?, reusableItems: [Item], onCompletion: @escaping () -> Void) { - let items = mapItems(changes: changes, reusableItems: reusableItems) - - /// We want to make sure that nothing else but this block is happening in this queue when updating `_items` - /// This also includes finishing the operation and notifying about the update. Only once everything is done, we conclude the operation. - queue.async(flags: .barrier) { - self._items = items - self._isProcessingDatabaseChange = false - let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) } - self.notifyDidChange(changes: returnedChanges, onCompletion: onCompletion) - } - } - - /// This method will asynchronously convert all the fetched objects into models. - /// This method is intended to be called from the `managedObjectContext` that is publishing the changes (The one linked to the `NSFetchedResultsController` - /// in this case). - /// Once the objects are mapped, those are sorted based on `sorting` - private func mapItems(changes: [ListChange]?, reusableItems: [Item]) -> [Item] { + /// - Important: Must be called from the managed object's perform closure. + @discardableResult private func processItems(_ changes: [ListChange]?, reusableItems: [Item], rawItemsNeedsValidation: Bool, notify: Bool) -> [Item] { let objects = frc.fetchedObjects ?? [] - return DatabaseItemConverter.convert( + let items = DatabaseItemConverter.convert( dtos: objects, existing: reusableItems, changes: changes, @@ -219,5 +168,14 @@ class BackgroundDatabaseObserver { itemReuseKeyPaths: itemReuseKeyPaths, sorting: sorting ) + queue.async { + self._items = items + self._rawItemsNeedsValidation = rawItemsNeedsValidation + guard notify else { return } + let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) } + self.notifyDidChange(changes: returnedChanges) + } + + return items } } diff --git a/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift b/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift index a5b5cb3e9e..a783274638 100644 --- a/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift +++ b/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift @@ -54,11 +54,6 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase { } func test_changeAggregatorSetup() throws { - let expectation1 = expectation(description: "onWillChange is called") - observer.onWillChange = { - expectation1.fulfill() - } - let expectation2 = expectation(description: "onDidChange is called") observer.onDidChange = { _ in expectation2.fulfill() @@ -206,6 +201,28 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase { XCTAssertEqual(expectedIds, observer.items.map { $0 }) } + func test_accessingItems_whenObservationStartsWithEmptyDBAndWriteHappens_thenWrittenDataIsReturned() throws { + let initialFinishedExpectation = XCTestExpectation(description: "Initial") + observer = BackgroundListDatabaseObserver( + database: database, + fetchRequest: fetchRequest, + itemCreator: { $0.testId }, + sorting: [] + ) + observer.onDidChange = { [initialFinishedExpectation] _ in + initialFinishedExpectation.fulfill() + } + try observer.startObserving() + // Immediate write after starting to observe (race between initial load and DB change) + try database.writeSynchronously { session in + let context = try XCTUnwrap(session as? NSManagedObjectContext) + let item = try XCTUnwrap(NSEntityDescription.insertNewObject(forEntityName: "TestManagedObject", into: context) as? TestManagedObject) + item.testId = "1" + item.testValue = "testValue1" + } + XCTAssertEqual(["1"], observer.items.map { $0 }) + } + // MARK: - private func startObservingAndWaitForInitialResults() throws {