diff --git a/CHANGELOG.md b/CHANGELOG.md index b04d21eb9ca..5ee66c2986b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### ⚡ Performance - Use background database observers for `CurrentUserController.currentUser`, `ChatChannelMemberListController.members`, and `ChatMessageController.message` which reduces CPU usage on the main thread [#3357](https://github.com/GetStream/stream-chat-swift/pull/3357) - `CurrentChatUserController` was often recreated when sending typing events [#3372](https://github.com/GetStream/stream-chat-swift/pull/3372) +- Reduce latency of the `BackgroundDatabaseObserver` by reducing thread switching when handling changes [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373) ### 🐞 Fixed - Fix rare crashes when deleting local database content on logout [#3355](https://github.com/GetStream/stream-chat-swift/pull/3355) - Fix rare crashes in `MulticastDelegate` when accessing it concurrently [#3361](https://github.com/GetStream/stream-chat-swift/pull/3361) +- Fix reading the local database state just after the initial write [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373) +- Fix a timing issue where accessing channels in `controllerWillChangeChannels` returned already changed channels [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373) ### 🔄 Changed - Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352) - Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364) diff --git a/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift b/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift index 9e72f3a53f2..2c854719600 100644 --- a/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift +++ b/Sources/StreamChat/Controllers/DatabaseObserver/BackgroundDatabaseObserver.swift @@ -28,37 +28,37 @@ class BackgroundDatabaseObserver { /// When called, notification observers are released var releaseNotificationObservers: (() -> Void)? - private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated, attributes: .concurrent) - private let processingQueue: OperationQueue - - private var _items: [Item] = [] + private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated) + private var _items: [Item]? + + // State handling for supporting will change, because in the callback we should return the previous state. + private var _willChangeItems: [Item]? + private var _notifyingWillChange = false /// The items that have been fetched and mapped - /// - /// - Note: Fetches items synchronously if the observer is in the middle of processing a change. var rawItems: [Item] { - // When items are accessed while DB change is being processed in the background, - // we want to return the processing change immediately. - // Example: controller synchronizes which updates DB, but then controller wants the - // updated data while the processing is still in progress. - let state: (isProcessing: Bool, preparedItems: [Item]) = queue.sync { (_isProcessingDatabaseChange, _items) } - if !state.isProcessing { - return state.preparedItems + // During the onWillChange we swap the results back to the previous state because onWillChange + // is dispatched to the main thread and when the main thread handles it, observer has already processed + // the database change. + if onWillChange != nil { + let willChangeState: (active: Bool, cachedItems: [Item]?) = queue.sync { (_notifyingWillChange, _willChangeItems) } + if willChangeState.active { + return willChangeState.cachedItems ?? [] + } } - // Otherwise fetch the state from the DB but also reusing existing state. - var items = [Item]() + + var rawItems: [Item]! frc.managedObjectContext.performAndWait { - items = mapItems(changes: nil, reusableItems: state.preparedItems) + // When we already have loaded items, reuse them, otherwise refetch all + rawItems = _items ?? updateItems(nil) } - return items + return rawItems } - - private var _isProcessingDatabaseChange = false private var _isInitialized: Bool = false private var isInitialized: Bool { get { queue.sync { _isInitialized } } - set { queue.async(flags: .barrier) { self._isInitialized = newValue } } + set { queue.async { self._isInitialized = newValue } } } deinit { @@ -98,19 +98,14 @@ class BackgroundDatabaseObserver { sectionNameKeyPath: nil, cacheName: nil ) - - let operationQueue = OperationQueue() - operationQueue.underlyingQueue = queue - operationQueue.name = "com.stream.database-observer" - operationQueue.maxConcurrentOperationCount = 1 - processingQueue = operationQueue - changeAggregator.onWillChange = { [weak self] in self?.notifyWillChange() } - changeAggregator.onDidChange = { [weak self] changes in - self?.updateItems(changes: changes) + guard let self else { return } + // Runs on the NSManagedObjectContext's queue, therefore skip performAndWait + self.updateItems(changes) + self.notifyDidChange(changes: changes) } } @@ -129,95 +124,64 @@ class BackgroundDatabaseObserver { frc.delegate = changeAggregator - /// Start a process to get the items, which will then notify via its blocks. - getInitialItems() + // Start loading initial items and call did change for the initial change. + frc.managedObjectContext.perform { [weak self] in + guard let self else { return } + let items = self.updateItems(nil) + let changes: [ListChange] = items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) } + self.notifyDidChange(changes: changes) + } } private func notifyWillChange() { - let setProcessingState: (Bool) -> Void = { [weak self] state in - self?.queue.async(flags: .barrier) { - self?._isProcessingDatabaseChange = state - } - } - guard let onWillChange = onWillChange else { - setProcessingState(true) return } - DispatchQueue.main.async { - setProcessingState(false) + // Will change callback happens on the main thread but by that time the observer + // has already updated its local cached state. For allowing to access the previous + // state from the will change callback, there is no other way than caching previous state. + // This is used by the channel list delegate. + + // `_items` is mutated by the NSManagedObjectContext's queue, here we are on that queue + // so it is safe to read the `_items` state from `self.queue`. + queue.sync { + _willChangeItems = _items + } + DispatchQueue.main.async { [weak self] in + guard let self else { return } + self.queue.async { + self._notifyingWillChange = true + } onWillChange() - setProcessingState(true) + self.queue.async { + self._willChangeItems = nil + self._notifyingWillChange = false + } } } - private func notifyDidChange(changes: [ListChange], onCompletion: @escaping () -> Void) { + private func notifyDidChange(changes: [ListChange]) { guard let onDidChange = onDidChange else { - onCompletion() return } DispatchQueue.main.async { onDidChange(changes) - onCompletion() } } - - private func getInitialItems() { - notifyWillChange() - updateItems(changes: nil) - } - - /// This method will add a new operation to the `processingQueue`, where operations are executed one-by-one. - /// The operation added to the queue will start the process of getting new results for the observer. - private func updateItems(changes: [ListChange]?, completion: (() -> Void)? = nil) { - let operation = AsyncOperation { [weak self] _, done in - guard let self = self else { - done(.continue) - completion?() - return - } - // Operation queue runs on the same `self.queue` - let reusableItems = self._items - self.frc.managedObjectContext.perform { - self.processItems(changes, reusableItems: reusableItems) { - done(.continue) - completion?() - } - } - } - - processingQueue.addOperation(operation) - } - - /// This method will process the currently fetched objects, and will notify the listeners. - /// When the process is done, it also updates the `_items`, which is the locally cached list of mapped items - /// This method will be called through an operation on `processingQueue`, which will serialize the execution until `onCompletion` is called. - private func processItems(_ changes: [ListChange]?, reusableItems: [Item], onCompletion: @escaping () -> Void) { - let items = mapItems(changes: changes, reusableItems: reusableItems) - - /// We want to make sure that nothing else but this block is happening in this queue when updating `_items` - /// This also includes finishing the operation and notifying about the update. Only once everything is done, we conclude the operation. - queue.async(flags: .barrier) { - self._items = items - self._isProcessingDatabaseChange = false - let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) } - self.notifyDidChange(changes: returnedChanges, onCompletion: onCompletion) - } - } - - /// This method will asynchronously convert all the fetched objects into models. - /// This method is intended to be called from the `managedObjectContext` that is publishing the changes (The one linked to the `NSFetchedResultsController` - /// in this case). - /// Once the objects are mapped, those are sorted based on `sorting` - private func mapItems(changes: [ListChange]?, reusableItems: [Item]) -> [Item] { - let objects = frc.fetchedObjects ?? [] - return DatabaseItemConverter.convert( - dtos: objects, - existing: reusableItems, + + /// Updates the locally cached items. + /// + /// - Important: Must be called from the managed object's perform closure. + @discardableResult private func updateItems(_ changes: [ListChange]?) -> [Item] { + let items = DatabaseItemConverter.convert( + dtos: frc.fetchedObjects ?? [], + existing: _items ?? [], changes: changes, itemCreator: itemCreator, itemReuseKeyPaths: itemReuseKeyPaths, sorting: sorting ) + _items = items + return items } } diff --git a/Sources/StreamChat/Utils/Database/DatabaseItemConverter.swift b/Sources/StreamChat/Utils/Database/DatabaseItemConverter.swift index cd7425fcb7a..2d9666bbeba 100644 --- a/Sources/StreamChat/Utils/Database/DatabaseItemConverter.swift +++ b/Sources/StreamChat/Utils/Database/DatabaseItemConverter.swift @@ -27,7 +27,7 @@ enum DatabaseItemConverter { let items: [Item] // Reuse converted items by id - if StreamRuntimeCheck._isDatabaseObserverItemReusingEnabled, let itemReuseKeyPaths, !existing.isEmpty { + if StreamRuntimeCheck._isDatabaseObserverItemReusingEnabled, let itemReuseKeyPaths { let existingItems = existing.map { ($0[keyPath: itemReuseKeyPaths.item], $0) } var lookup = Dictionary(existingItems, uniquingKeysWith: { _, second in second }) // Changes contains newly converted items, add them to the lookup diff --git a/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift b/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift index a5b5cb3e9eb..a9b229e8fd1 100644 --- a/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift +++ b/Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift @@ -54,11 +54,6 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase { } func test_changeAggregatorSetup() throws { - let expectation1 = expectation(description: "onWillChange is called") - observer.onWillChange = { - expectation1.fulfill() - } - let expectation2 = expectation(description: "onDidChange is called") observer.onDidChange = { _ in expectation2.fulfill() @@ -206,6 +201,30 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase { XCTAssertEqual(expectedIds, observer.items.map { $0 }) } + func test_accessingItems_whenObservationStartsWithEmptyDBAndWriteHappens_thenWrittenDataIsReturned() throws { + let initialFinishedExpectation = XCTestExpectation(description: "Initial") + observer = BackgroundListDatabaseObserver( + database: database, + fetchRequest: fetchRequest, + itemCreator: { $0.testId }, + sorting: [] + ) + observer.onDidChange = { _ in + initialFinishedExpectation.fulfill() + } + try observer.startObserving() + // Immediate write after starting to observe (race between initial load and DB change) + try database.writeSynchronously { session in + let context = try XCTUnwrap(session as? NSManagedObjectContext) + let item = try XCTUnwrap(NSEntityDescription.insertNewObject(forEntityName: "TestManagedObject", into: context) as? TestManagedObject) + item.testId = "1" + item.testValue = "testValue1" + } + XCTAssertEqual(["1"], observer.items.map { $0 }) + wait(for: [initialFinishedExpectation], timeout: defaultTimeout) + XCTAssertEqual(["1"], observer.items.map { $0 }) + } + // MARK: - private func startObservingAndWaitForInitialResults() throws { diff --git a/Tests/StreamChatTests/StreamChatFlakyTests.xctestplan b/Tests/StreamChatTests/StreamChatFlakyTests.xctestplan index 28c65f4fe4e..48bc4be4100 100644 --- a/Tests/StreamChatTests/StreamChatFlakyTests.xctestplan +++ b/Tests/StreamChatTests/StreamChatFlakyTests.xctestplan @@ -29,7 +29,6 @@ "AttachmentQueueUploader_Tests\/test_uploader_whenPostProcessorAvailable_shouldChangeTheAttachmentPayload()", "AttachmentQueueUploader_Tests\/test_uploader_whenUploadFails_markMessageAsFailed()", "ChannelController_Tests\/test_createCall_propagatesResultFromUpdater()", - "ChannelListController_Tests\/test_willAndDidCallbacks_areCalledInCorrectOrder()", "ChannelListPayload_Tests\/test_decode_bigChannelListPayload()", "ChannelListPayload_Tests\/test_hugeChannelListQuery_save()", "ChannelListPayload_Tests\/test_hugeChannelListQuery_save_DB_empty()", diff --git a/Tests/StreamChatTests/StreamChatTestPlan.xctestplan b/Tests/StreamChatTests/StreamChatTestPlan.xctestplan index 5295c1cc5d9..e687c32c7e6 100644 --- a/Tests/StreamChatTests/StreamChatTestPlan.xctestplan +++ b/Tests/StreamChatTests/StreamChatTestPlan.xctestplan @@ -43,7 +43,6 @@ "AttachmentQueueUploader_Tests\/test_uploader_whenPostProcessorAvailable_shouldChangeTheAttachmentPayload()", "AttachmentQueueUploader_Tests\/test_uploader_whenUploadFails_markMessageAsFailed()", "ChannelController_Tests\/test_createCall_propagatesResultFromUpdater()", - "ChannelListController_Tests\/test_willAndDidCallbacks_areCalledInCorrectOrder()", "ChannelListPayload_Tests\/test_decode_bigChannelListPayload()", "ChannelListPayload_Tests\/test_hugeChannelListQuery_save()", "ChannelListPayload_Tests\/test_hugeChannelListQuery_save_DB_empty()",