Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading the local database state just after the initial write #3373

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### ⚡ Performance
- Use background database observers for `CurrentUserController.currentUser`, `ChatChannelMemberListController.members`, and `ChatMessageController.message` which reduces CPU usage on the main thread [#3357](https://github.com/GetStream/stream-chat-swift/pull/3357)
- `CurrentChatUserController` was often recreated when sending typing events [#3372](https://github.com/GetStream/stream-chat-swift/pull/3372)
- Reduce latency of the `BackgroundDatabaseObserver` by reducing thread switching when handling changes [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373)
laevandus marked this conversation as resolved.
Show resolved Hide resolved
### 🐞 Fixed
- Fix rare crashes when deleting local database content on logout [#3355](https://github.com/GetStream/stream-chat-swift/pull/3355)
- Fix rare crashes in `MulticastDelegate` when accessing it concurrently [#3361](https://github.com/GetStream/stream-chat-swift/pull/3361)
- Fix reading the local database state just after the initial write [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373)
- Fix a timing issue where accessing channels in `controllerWillChangeChannels` returned already changed channels [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373)
### 🔄 Changed
- Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352)
- Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,37 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {
/// When called, notification observers are released
var releaseNotificationObservers: (() -> Void)?

private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated, attributes: .concurrent)
private let processingQueue: OperationQueue

private var _items: [Item] = []
private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated)
private var _items: [Item]?

// State handling for supporting will change, because in the callback we should return the previous state.
private var _willChangeItems: [Item]?
private var _notifyingWillChange = false

/// The items that have been fetched and mapped
///
/// - Note: Fetches items synchronously if the observer is in the middle of processing a change.
var rawItems: [Item] {
// When items are accessed while DB change is being processed in the background,
// we want to return the processing change immediately.
// Example: controller synchronizes which updates DB, but then controller wants the
// updated data while the processing is still in progress.
let state: (isProcessing: Bool, preparedItems: [Item]) = queue.sync { (_isProcessingDatabaseChange, _items) }
if !state.isProcessing {
return state.preparedItems
// During the onWillChange we swap the results back to the previous state because onWillChange
// is dispatched to the main thread and when the main thread handles it, observer has already processed
// the database change.
if onWillChange != nil {
let willChangeState: (active: Bool, cachedItems: [Item]?) = queue.sync { (_notifyingWillChange, _willChangeItems) }
if willChangeState.active {
return willChangeState.cachedItems ?? []
}
}
// Otherwise fetch the state from the DB but also reusing existing state.
var items = [Item]()

var rawItems: [Item]!
frc.managedObjectContext.performAndWait {
items = mapItems(changes: nil, reusableItems: state.preparedItems)
// When we already have loaded items, reuse them, otherwise refetch all
rawItems = _items ?? updateItems(nil)
}
return items
return rawItems
}

private var _isProcessingDatabaseChange = false

private var _isInitialized: Bool = false
private var isInitialized: Bool {
get { queue.sync { _isInitialized } }
set { queue.async(flags: .barrier) { self._isInitialized = newValue } }
set { queue.async { self._isInitialized = newValue } }
}

deinit {
Expand Down Expand Up @@ -98,19 +98,14 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {
sectionNameKeyPath: nil,
cacheName: nil
)

let operationQueue = OperationQueue()
operationQueue.underlyingQueue = queue
operationQueue.name = "com.stream.database-observer"
operationQueue.maxConcurrentOperationCount = 1
processingQueue = operationQueue

changeAggregator.onWillChange = { [weak self] in
self?.notifyWillChange()
}

changeAggregator.onDidChange = { [weak self] changes in
self?.updateItems(changes: changes)
guard let self else { return }
// Runs on the NSManagedObjectContext's queue, therefore skip performAndWait
self.updateItems(changes)
self.notifyDidChange(changes: changes)
}
}

Expand All @@ -129,95 +124,64 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {

frc.delegate = changeAggregator

/// Start a process to get the items, which will then notify via its blocks.
getInitialItems()
// Start loading initial items and call did change for the initial change.
frc.managedObjectContext.perform { [weak self] in
guard let self else { return }
let items = self.updateItems(nil)
let changes: [ListChange<Item>] = items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) }
self.notifyDidChange(changes: changes)
}
}

private func notifyWillChange() {
let setProcessingState: (Bool) -> Void = { [weak self] state in
self?.queue.async(flags: .barrier) {
self?._isProcessingDatabaseChange = state
}
}

guard let onWillChange = onWillChange else {
setProcessingState(true)
return
}
DispatchQueue.main.async {
setProcessingState(false)
// Will change callback happens on the main thread but by that time the observer
// has already updated its local cached state. For allowing to access the previous
// state from the will change callback, there is no other way than caching previous state.
// This is used by the channel list delegate.

// `_items` is mutated by the NSManagedObjectContext's queue, here we are on that queue
// so it is safe to read the `_items` state from `self.queue`.
queue.sync {
_willChangeItems = _items
}
DispatchQueue.main.async { [weak self] in
guard let self else { return }
self.queue.async {
self._notifyingWillChange = true
}
onWillChange()
setProcessingState(true)
self.queue.async {
self._willChangeItems = nil
self._notifyingWillChange = false
}
}
}

private func notifyDidChange(changes: [ListChange<Item>], onCompletion: @escaping () -> Void) {
private func notifyDidChange(changes: [ListChange<Item>]) {
guard let onDidChange = onDidChange else {
onCompletion()
return
}
DispatchQueue.main.async {
onDidChange(changes)
onCompletion()
}
}

private func getInitialItems() {
notifyWillChange()
updateItems(changes: nil)
}

/// This method will add a new operation to the `processingQueue`, where operations are executed one-by-one.
/// The operation added to the queue will start the process of getting new results for the observer.
private func updateItems(changes: [ListChange<Item>]?, completion: (() -> Void)? = nil) {
let operation = AsyncOperation { [weak self] _, done in
guard let self = self else {
done(.continue)
completion?()
return
}
// Operation queue runs on the same `self.queue`
let reusableItems = self._items
self.frc.managedObjectContext.perform {
self.processItems(changes, reusableItems: reusableItems) {
done(.continue)
completion?()
}
}
}

processingQueue.addOperation(operation)
}

/// This method will process the currently fetched objects, and will notify the listeners.
/// When the process is done, it also updates the `_items`, which is the locally cached list of mapped items
/// This method will be called through an operation on `processingQueue`, which will serialize the execution until `onCompletion` is called.
private func processItems(_ changes: [ListChange<Item>]?, reusableItems: [Item], onCompletion: @escaping () -> Void) {
let items = mapItems(changes: changes, reusableItems: reusableItems)

/// We want to make sure that nothing else but this block is happening in this queue when updating `_items`
/// This also includes finishing the operation and notifying about the update. Only once everything is done, we conclude the operation.
queue.async(flags: .barrier) {
self._items = items
self._isProcessingDatabaseChange = false
let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) }
self.notifyDidChange(changes: returnedChanges, onCompletion: onCompletion)
}
}

/// This method will asynchronously convert all the fetched objects into models.
/// This method is intended to be called from the `managedObjectContext` that is publishing the changes (The one linked to the `NSFetchedResultsController`
/// in this case).
/// Once the objects are mapped, those are sorted based on `sorting`
private func mapItems(changes: [ListChange<Item>]?, reusableItems: [Item]) -> [Item] {
let objects = frc.fetchedObjects ?? []
return DatabaseItemConverter.convert(
dtos: objects,
existing: reusableItems,

/// Updates the locally cached items.
///
/// - Important: Must be called from the managed object's perform closure.
@discardableResult private func updateItems(_ changes: [ListChange<Item>]?) -> [Item] {
let items = DatabaseItemConverter.convert(
dtos: frc.fetchedObjects ?? [],
existing: _items ?? [],
changes: changes,
itemCreator: itemCreator,
itemReuseKeyPaths: itemReuseKeyPaths,
sorting: sorting
)
_items = items
return items
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ enum DatabaseItemConverter {
let items: [Item]

// Reuse converted items by id
if StreamRuntimeCheck._isDatabaseObserverItemReusingEnabled, let itemReuseKeyPaths, !existing.isEmpty {
if StreamRuntimeCheck._isDatabaseObserverItemReusingEnabled, let itemReuseKeyPaths {
let existingItems = existing.map { ($0[keyPath: itemReuseKeyPaths.item], $0) }
var lookup = Dictionary(existingItems, uniquingKeysWith: { _, second in second })
// Changes contains newly converted items, add them to the lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase {
}

func test_changeAggregatorSetup() throws {
let expectation1 = expectation(description: "onWillChange is called")
observer.onWillChange = {
expectation1.fulfill()
}
Comment on lines -57 to -60
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because data is accessible as soon as startObserving is called, I removed the will change delegate because it does not really make sense anymore (we allow immediate access). I kept the didChange because we still do async loading.


let expectation2 = expectation(description: "onDidChange is called")
observer.onDidChange = { _ in
expectation2.fulfill()
Expand Down Expand Up @@ -206,6 +201,30 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase {
XCTAssertEqual(expectedIds, observer.items.map { $0 })
}

func test_accessingItems_whenObservationStartsWithEmptyDBAndWriteHappens_thenWrittenDataIsReturned() throws {
let initialFinishedExpectation = XCTestExpectation(description: "Initial")
observer = BackgroundListDatabaseObserver<String, TestManagedObject>(
database: database,
fetchRequest: fetchRequest,
itemCreator: { $0.testId },
sorting: []
)
observer.onDidChange = { _ in
initialFinishedExpectation.fulfill()
}
try observer.startObserving()
// Immediate write after starting to observe (race between initial load and DB change)
try database.writeSynchronously { session in
let context = try XCTUnwrap(session as? NSManagedObjectContext)
let item = try XCTUnwrap(NSEntityDescription.insertNewObject(forEntityName: "TestManagedObject", into: context) as? TestManagedObject)
item.testId = "1"
item.testValue = "testValue1"
}
XCTAssertEqual(["1"], observer.items.map { $0 })
wait(for: [initialFinishedExpectation], timeout: defaultTimeout)
XCTAssertEqual(["1"], observer.items.map { $0 })
}

// MARK: -

private func startObservingAndWaitForInitialResults() throws {
Expand Down
1 change: 0 additions & 1 deletion Tests/StreamChatTests/StreamChatFlakyTests.xctestplan
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"AttachmentQueueUploader_Tests\/test_uploader_whenPostProcessorAvailable_shouldChangeTheAttachmentPayload()",
"AttachmentQueueUploader_Tests\/test_uploader_whenUploadFails_markMessageAsFailed()",
"ChannelController_Tests\/test_createCall_propagatesResultFromUpdater()",
"ChannelListController_Tests\/test_willAndDidCallbacks_areCalledInCorrectOrder()",
"ChannelListPayload_Tests\/test_decode_bigChannelListPayload()",
"ChannelListPayload_Tests\/test_hugeChannelListQuery_save()",
"ChannelListPayload_Tests\/test_hugeChannelListQuery_save_DB_empty()",
Expand Down
1 change: 0 additions & 1 deletion Tests/StreamChatTests/StreamChatTestPlan.xctestplan
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
"AttachmentQueueUploader_Tests\/test_uploader_whenPostProcessorAvailable_shouldChangeTheAttachmentPayload()",
"AttachmentQueueUploader_Tests\/test_uploader_whenUploadFails_markMessageAsFailed()",
"ChannelController_Tests\/test_createCall_propagatesResultFromUpdater()",
"ChannelListController_Tests\/test_willAndDidCallbacks_areCalledInCorrectOrder()",
"ChannelListPayload_Tests\/test_decode_bigChannelListPayload()",
"ChannelListPayload_Tests\/test_hugeChannelListQuery_save()",
"ChannelListPayload_Tests\/test_hugeChannelListQuery_save_DB_empty()",
Expand Down
Loading