Skip to content

Commit

Permalink
Fix reading the local database state just after the initial write. Re…
Browse files Browse the repository at this point in the history
…duce latency of the database observer
  • Loading branch information
laevandus committed Aug 13, 2024
1 parent d40a2f3 commit b6511ca
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 85 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## StreamChat
### ⚡ Performance
- Use background database observers for `CurrentUserController.currentUser`, `ChatChannelMemberListController.members`, and `ChatMessageController.message` which reduces CPU usage on the main thread [#3357](https://github.com/GetStream/stream-chat-swift/pull/3357)
- Reduce latency of the `BackgroundDatabaseObserver` by reducing thread switching when handling changes [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373)
### 🐞 Fixed
- Fix rare crashes when deleting local database content on logout [#3355](https://github.com/GetStream/stream-chat-swift/pull/3355)
- Fix rare crashes in `MulticastDelegate` when accessing it concurrently [#3361](https://github.com/GetStream/stream-chat-swift/pull/3361)
- Fix reading the local database state just after the initial write [#3373](https://github.com/GetStream/stream-chat-swift/pull/3373)
### 🔄 Changed
- Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352)
- Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,32 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {
/// When called, notification observers are released
var releaseNotificationObservers: (() -> Void)?

private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated, attributes: .concurrent)
private let processingQueue: OperationQueue

private let queue = DispatchQueue(label: "io.getstream.list-database-observer", qos: .userInitiated)
private var _items: [Item] = []

/// The items that have been fetched and mapped
///
/// - Note: Fetches items synchronously if the observer is in the middle of processing a change.
var rawItems: [Item] {
// When items are accessed while DB change is being processed in the background,
// we want to return the processing change immediately.
// Example: controller synchronizes which updates DB, but then controller wants the
// updated data while the processing is still in progress.
let state: (isProcessing: Bool, preparedItems: [Item]) = queue.sync { (_isProcessingDatabaseChange, _items) }
if !state.isProcessing {
let state: (needsValidation: Bool, preparedItems: [Item]) = queue.sync { (_rawItemsNeedsValidation, _items) }
if !state.needsValidation {
return state.preparedItems
}
// Otherwise fetch the state from the DB but also reusing existing state.
// Validate current items and update the _items property for the next access.
// Note that already cached items are reused (items loaded by the initial fetch).
var items = [Item]()
frc.managedObjectContext.performAndWait {
items = mapItems(changes: nil, reusableItems: state.preparedItems)
items = processItems(nil, reusableItems: state.preparedItems, rawItemsNeedsValidation: false, notify: false)
}
return items
}

private var _isProcessingDatabaseChange = false
private var _rawItemsNeedsValidation = true

private var _isInitialized: Bool = false
private var isInitialized: Bool {
get { queue.sync { _isInitialized } }
set { queue.async(flags: .barrier) { self._isInitialized = newValue } }
set { queue.async { self._isInitialized = newValue } }
}

deinit {
Expand Down Expand Up @@ -98,19 +93,13 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {
sectionNameKeyPath: nil,
cacheName: nil
)

let operationQueue = OperationQueue()
operationQueue.underlyingQueue = queue
operationQueue.name = "com.stream.database-observer"
operationQueue.maxConcurrentOperationCount = 1
processingQueue = operationQueue

changeAggregator.onWillChange = { [weak self] in
self?.notifyWillChange()
}

changeAggregator.onDidChange = { [weak self] changes in
self?.updateItems(changes: changes)
guard let self else { return }
let reusableItems = queue.sync { self._items }
self.processItems(changes, reusableItems: reusableItems, rawItemsNeedsValidation: false, notify: true)
}
}

Expand All @@ -129,95 +118,64 @@ class BackgroundDatabaseObserver<Item, DTO: NSManagedObject> {

frc.delegate = changeAggregator

/// Start a process to get the items, which will then notify via its blocks.
getInitialItems()
// Start loading initial items and do not call delegates for the initial change
// because the initial state is available even when accessing rawItems before this
// function finishes. There can be a race between initial load and DB write which
// requires validation when rawItems are accessed.
frc.managedObjectContext.perform {
self.processItems(nil, reusableItems: [], rawItemsNeedsValidation: true, notify: true)
}
}

private func notifyWillChange() {
let setProcessingState: (Bool) -> Void = { [weak self] state in
self?.queue.async(flags: .barrier) {
self?._isProcessingDatabaseChange = state
let setNeedsValidation: (Bool) -> Void = { [weak self] state in
self?.queue.async {
self?._rawItemsNeedsValidation = state
}
}

setNeedsValidation(true)
guard let onWillChange = onWillChange else {
setProcessingState(true)
return
}
DispatchQueue.main.async {
setProcessingState(false)
// Do not allow reading the state directly from FRC when accessing rawItems in will change
setNeedsValidation(false)
onWillChange()
setProcessingState(true)
setNeedsValidation(true)
}
}

private func notifyDidChange(changes: [ListChange<Item>], onCompletion: @escaping () -> Void) {
private func notifyDidChange(changes: [ListChange<Item>]) {
guard let onDidChange = onDidChange else {
onCompletion()
return
}
DispatchQueue.main.async {
onDidChange(changes)
onCompletion()
}
}

private func getInitialItems() {
notifyWillChange()
updateItems(changes: nil)
}

/// This method will add a new operation to the `processingQueue`, where operations are executed one-by-one.
/// The operation added to the queue will start the process of getting new results for the observer.
private func updateItems(changes: [ListChange<Item>]?, completion: (() -> Void)? = nil) {
let operation = AsyncOperation { [weak self] _, done in
guard let self = self else {
done(.continue)
completion?()
return
}
// Operation queue runs on the same `self.queue`
let reusableItems = self._items
self.frc.managedObjectContext.perform {
self.processItems(changes, reusableItems: reusableItems) {
done(.continue)
completion?()
}
}
}

processingQueue.addOperation(operation)
}

/// This method will process the currently fetched objects, and will notify the listeners.
/// When the process is done, it also updates the `_items`, which is the locally cached list of mapped items
/// This method will be called through an operation on `processingQueue`, which will serialize the execution until `onCompletion` is called.
private func processItems(_ changes: [ListChange<Item>]?, reusableItems: [Item], onCompletion: @escaping () -> Void) {
let items = mapItems(changes: changes, reusableItems: reusableItems)

/// We want to make sure that nothing else but this block is happening in this queue when updating `_items`
/// This also includes finishing the operation and notifying about the update. Only once everything is done, we conclude the operation.
queue.async(flags: .barrier) {
self._items = items
self._isProcessingDatabaseChange = false
let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) }
self.notifyDidChange(changes: returnedChanges, onCompletion: onCompletion)
}
}

/// This method will asynchronously convert all the fetched objects into models.
/// This method is intended to be called from the `managedObjectContext` that is publishing the changes (The one linked to the `NSFetchedResultsController`
/// in this case).
/// Once the objects are mapped, those are sorted based on `sorting`
private func mapItems(changes: [ListChange<Item>]?, reusableItems: [Item]) -> [Item] {
/// - Important: Must be called from the managed object's perform closure.
@discardableResult private func processItems(_ changes: [ListChange<Item>]?, reusableItems: [Item], rawItemsNeedsValidation: Bool, notify: Bool) -> [Item] {
let objects = frc.fetchedObjects ?? []
return DatabaseItemConverter.convert(
let items = DatabaseItemConverter.convert(
dtos: objects,
existing: reusableItems,
changes: changes,
itemCreator: itemCreator,
itemReuseKeyPaths: itemReuseKeyPaths,
sorting: sorting
)
queue.async {
self._items = items
self._rawItemsNeedsValidation = rawItemsNeedsValidation
guard notify else { return }
let returnedChanges = changes ?? items.enumerated().map { .insert($1, index: IndexPath(item: $0, section: 0)) }
self.notifyDidChange(changes: returnedChanges)
}

return items
}
}
27 changes: 22 additions & 5 deletions Tests/StreamChatTests/BackgroundListDatabaseObserver_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase {
}

func test_changeAggregatorSetup() throws {
let expectation1 = expectation(description: "onWillChange is called")
observer.onWillChange = {
expectation1.fulfill()
}

let expectation2 = expectation(description: "onDidChange is called")
observer.onDidChange = { _ in
expectation2.fulfill()
Expand Down Expand Up @@ -206,6 +201,28 @@ final class BackgroundListDatabaseObserver_Tests: XCTestCase {
XCTAssertEqual(expectedIds, observer.items.map { $0 })
}

func test_accessingItems_whenObservationStartsWithEmptyDBAndWriteHappens_thenWrittenDataIsReturned() throws {
let initialFinishedExpectation = XCTestExpectation(description: "Initial")
observer = BackgroundListDatabaseObserver<String, TestManagedObject>(
database: database,
fetchRequest: fetchRequest,
itemCreator: { $0.testId },
sorting: []
)
observer.onDidChange = { [initialFinishedExpectation] _ in
initialFinishedExpectation.fulfill()
}
try observer.startObserving()
// Immediate write after starting to observe (race between initial load and DB change)
try database.writeSynchronously { session in
let context = try XCTUnwrap(session as? NSManagedObjectContext)
let item = try XCTUnwrap(NSEntityDescription.insertNewObject(forEntityName: "TestManagedObject", into: context) as? TestManagedObject)
item.testId = "1"
item.testValue = "testValue1"
}
XCTAssertEqual(["1"], observer.items.map { $0 })
}

// MARK: -

private func startObservingAndWaitForInitialResults() throws {
Expand Down

0 comments on commit b6511ca

Please sign in to comment.