Skip to content

Commit

Permalink
Run offline state sync in the background instead of pausing other API…
Browse files Browse the repository at this point in the history
… requests while it runs
  • Loading branch information
laevandus committed Aug 9, 2024
1 parent 05c210b commit 3ad3ed3
Show file tree
Hide file tree
Showing 20 changed files with 487 additions and 216 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### 🔄 Changed
- Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352)
- Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364)
- Run offline state sync in the background instead of pausing other API requests while it runs [#3367](https://github.com/GetStream/stream-chat-swift/pull/3367)

# [4.61.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.61.0)
_July 30, 2024_
Expand Down
14 changes: 5 additions & 9 deletions Sources/StreamChat/ChatClient+Environment.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ extension ChatClient {

var syncRepositoryBuilder: (
_ config: ChatClientConfig,
_ activeChannelControllers: ThreadSafeWeakCollection<ChatChannelController>,
_ activeChannelListControllers: ThreadSafeWeakCollection<ChatChannelListController>,
_ offlineRequestsRepository: OfflineRequestsRepository,
_ eventNotificationCenter: EventNotificationCenter,
_ database: DatabaseContainer,
Expand All @@ -134,13 +132,11 @@ extension ChatClient {
) -> SyncRepository = {
SyncRepository(
config: $0,
activeChannelControllers: $1,
activeChannelListControllers: $2,
offlineRequestsRepository: $3,
eventNotificationCenter: $4,
database: $5,
apiClient: $6,
channelListUpdater: $7
offlineRequestsRepository: $1,
eventNotificationCenter: $2,
database: $3,
apiClient: $4,
channelListUpdater: $5
)
}

Expand Down
31 changes: 0 additions & 31 deletions Sources/StreamChat/ChatClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ public class ChatClient {
/// work if needed (i.e. when a new message pending sent appears in the database, a worker tries to send it.)
private(set) var backgroundWorkers: [Worker] = []

/// Keeps a weak reference to the active channel list controllers to ensure a proper recovery when coming back online
private(set) var activeChannelListControllers = ThreadSafeWeakCollection<ChatChannelListController>()
private(set) var activeChannelControllers = ThreadSafeWeakCollection<ChatChannelController>()

/// Background worker that takes care about client connection recovery when the Internet comes back OR app transitions from background to foreground.
private(set) var connectionRecoveryHandler: ConnectionRecoveryHandler?

Expand Down Expand Up @@ -167,8 +163,6 @@ public class ChatClient {
)
let syncRepository = environment.syncRepositoryBuilder(
config,
activeChannelControllers,
activeChannelListControllers,
offlineRequestsRepository,
eventNotificationCenter,
databaseContainer,
Expand Down Expand Up @@ -469,8 +463,6 @@ public class ChatClient {
authenticationRepository.logOutUser()

// Stop tracking active components
activeChannelControllers.removeAllObjects()
activeChannelListControllers.removeAllObjects()
syncRepository.removeAllTracked()

let group = DispatchGroup()
Expand Down Expand Up @@ -591,29 +583,6 @@ public class ChatClient {
]
}

func startTrackingChannelController(_ channelController: ChatChannelController) {
// If it is already tracking, do nothing.
guard !activeChannelControllers.contains(channelController) else {
return
}
activeChannelControllers.add(channelController)
}

func stopTrackingChannelController(_ channelController: ChatChannelController) {
activeChannelControllers.remove(channelController)
}

func startTrackingChannelListController(_ channelListController: ChatChannelListController) {
guard !activeChannelListControllers.contains(channelListController) else {
return
}
activeChannelListControllers.add(channelListController)
}

func stopTrackingChannelListController(_ channelListController: ChatChannelListController) {
activeChannelListControllers.remove(channelListController)
}

func completeConnectionIdWaiters(connectionId: String?) {
connectionRepository.completeConnectionIdWaiters(connectionId: connectionId)
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/StreamChat/Config/StreamRuntimeCheck.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public enum StreamRuntimeCheck {
///
/// Enables reusing unchanged converted items in database observers.
public static var _isDatabaseObserverItemReusingEnabled = true

/// For *internal use* only
///
/// Uses vresion 2 for offline state sync.
public static var _isSyncV2Enabled = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP
}

override public func synchronize(_ completion: ((_ error: Error?) -> Void)? = nil) {
client.startTrackingChannelController(self)
client.syncRepository.startTrackingChannelController(self)
synchronize(isInRecoveryMode: false, completion)
}

Expand Down Expand Up @@ -1030,7 +1030,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP
return
}

client.startTrackingChannelController(self)
client.syncRepository.startTrackingChannelController(self)

updater.startWatching(cid: cid, isInRecoveryMode: isInRecoveryMode) { error in
self.state = error.map { .remoteDataFetchFailed(ClientError(with: $0)) } ?? .remoteDataFetched
Expand Down Expand Up @@ -1064,7 +1064,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP
return
}

client.stopTrackingChannelController(self)
client.syncRepository.stopTrackingChannelController(self)

updater.stopWatching(cid: cid) { error in
self.state = error.map { .remoteDataFetchFailed(ClientError(with: $0)) } ?? .localDataFetched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class ChatChannelListController: DataController, DelegateCallable, DataSt
override public func synchronize(_ completion: ((_ error: Error?) -> Void)? = nil) {
startChannelListObserverIfNeeded()
channelListLinker.start(with: client.eventNotificationCenter)
client.startTrackingChannelListController(self)
client.syncRepository.startTrackingChannelListController(self)
updateChannelList(completion)
}

Expand Down Expand Up @@ -191,6 +191,11 @@ public class ChatChannelListController: DataController, DelegateCallable, DataSt

// MARK: - Internal

func refreshLoadedChannels(completion: @escaping (Result<Set<ChannelId>, Error>) -> Void) {
let channelCount = channelListObserver.items.count
worker.refreshLoadedChannels(for: query, channelCount: channelCount, completion: completion)
}

func resetQuery(
watchedAndSynchedChannelIds: Set<ChannelId>,
synchedChannelIds: Set<ChannelId>,
Expand Down
99 changes: 96 additions & 3 deletions Sources/StreamChat/Repositories/SyncOperations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,105 @@ final class SyncContext {
var synchedChannelIds: Set<ChannelId> = Set()
var watchedAndSynchedChannelIds: Set<ChannelId> = Set()
var unwantedChannelIds: Set<ChannelId> = Set()
var missingEventSyncSuccessful = false

init(lastSyncAt: Date) {
self.lastSyncAt = lastSyncAt
}

var canRefreshChannelLists: Bool {
!missingEventSyncSuccessful
}
}

private let syncOperationsMaximumRetries = 2

final class ActiveChannelIdsOperation: AsyncOperation {
init(
syncRepository: SyncRepository,
context: SyncContext
) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak syncRepository] _, done in
guard let syncRepository else {
done(.continue)
return
}

context.localChannelIds.append(contentsOf: syncRepository.activeChannelControllers.allObjects.compactMap(\.cid))
context.localChannelIds.append(contentsOf:
syncRepository.activeChannelListControllers.allObjects
.map(\.channels)
.flatMap { $0 }
.map(\.cid)
)

// Main actor requirement
DispatchQueue.main.async {
context.localChannelIds.append(contentsOf: syncRepository.activeChats.allObjects.compactMap { try? $0.cid })
context.localChannelIds.append(contentsOf:
syncRepository.activeChannelLists.allObjects
.map(\.state.channels)
.flatMap { $0 }
.map(\.cid)
)
context.localChannelIds = Array(Set(context.localChannelIds))
log.info("Found \(context.localChannelIds.count) active channels", subsystems: .offlineSupport)
done(.continue)
}
}
}
}

final class RefreshChannelListOperation: AsyncOperation {
init(controller: ChatChannelListController, context: SyncContext) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak controller] _, done in
guard context.canRefreshChannelLists else {
done(.continue)
return
}
guard let controller = controller, controller.canBeRecovered else {
done(.continue)
return
}
controller.refreshLoadedChannels { result in
switch result {
case .success(let channelIds):
log.debug("Synced \(channelIds.count) channels in a channel list controller (\(controller.query.filter)", subsystems: .offlineSupport)
context.synchedChannelIds.formUnion(channelIds)
done(.continue)
case .failure(let error):
log.error("Failed refreshing channel list controller (\(controller.query.filter) with error \(error)", subsystems: .offlineSupport)
done(.retry)
}
}
}
}

init(channelList: ChannelList, context: SyncContext) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak channelList] _, done in
guard context.canRefreshChannelLists else {
done(.continue)
return
}
guard let channelList else {
done(.continue)
return
}
Task {
do {
let channelIds = try await channelList.refreshLoadedChannels()
log.debug("Synced \(channelIds.count) channels in a channel list (\(channelList.query.filter)", subsystems: .offlineSupport)
context.synchedChannelIds.formUnion(channelIds)
done(.continue)
} catch {
log.error("Failed refreshing channel list (\(channelList.query.filter) with error \(error)", subsystems: .offlineSupport)
done(.retry)
}
}
}
}
}

final class GetChannelIdsOperation: AsyncOperation {
init(database: DatabaseContainer, context: SyncContext, activeChannelIds: [ChannelId]) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak database] _, done in
Expand All @@ -39,7 +130,7 @@ final class GetChannelIdsOperation: AsyncOperation {
}

final class SyncEventsOperation: AsyncOperation {
init(syncRepository: SyncRepository, context: SyncContext) {
init(syncRepository: SyncRepository, context: SyncContext, recovery: Bool) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak syncRepository] _, done in
log.info(
"1. Call `/sync` endpoint and get missing events for all locally existed channels",
Expand All @@ -49,14 +140,16 @@ final class SyncEventsOperation: AsyncOperation {
syncRepository?.syncChannelsEvents(
channelIds: context.localChannelIds,
lastSyncAt: context.lastSyncAt,
isRecovery: true
isRecovery: recovery
) { result in
switch result {
case let .success(channelIds):
context.synchedChannelIds = Set(channelIds)
context.missingEventSyncSuccessful = true
done(.continue)
case let .failure(error):
context.synchedChannelIds = Set([])
context.missingEventSyncSuccessful = false
done(error.shouldRetry ? .retry : .continue)
}
}
Expand Down Expand Up @@ -182,7 +275,7 @@ final class DeleteUnwantedChannelsOperation: AsyncOperation {
final class ExecutePendingOfflineActions: AsyncOperation {
init(offlineRequestsRepository: OfflineRequestsRepository) {
super.init(maxRetries: syncOperationsMaximumRetries) { [weak offlineRequestsRepository] _, done in
log.info("5. Running offline actions requests", subsystems: .offlineSupport)
log.info("Running offline actions requests", subsystems: .offlineSupport)
offlineRequestsRepository?.runQueuedRequests {
done(.continue)
}
Expand Down
Loading

0 comments on commit 3ad3ed3

Please sign in to comment.