From 3ad3ed30b79740f063a000d2e12a5935458e162b Mon Sep 17 00:00:00 2001 From: Toomas Vahter Date: Fri, 9 Aug 2024 14:06:20 +0300 Subject: [PATCH] Run offline state sync in the background instead of pausing other API requests while it runs --- CHANGELOG.md | 1 + .../StreamChat/ChatClient+Environment.swift | 14 +- Sources/StreamChat/ChatClient.swift | 31 --- .../Config/StreamRuntimeCheck.swift | 5 + .../ChannelController/ChannelController.swift | 6 +- .../ChannelListController.swift | 7 +- .../Repositories/SyncOperations.swift | 99 ++++++++- .../Repositories/SyncRepository.swift | 159 ++++++++++++--- .../StreamChat/StateLayer/ChannelList.swift | 10 + Sources/StreamChat/StateLayer/Chat.swift | 4 +- .../StateLayer/ChatClient+Factory.swift | 4 +- .../Workers/ChannelListUpdater.swift | 50 +++++ .../ChatChannelListController_Mock.swift | 6 + .../Repositories/SyncRepository_Mock.swift | 20 +- .../SpyPattern/Spy/APIClient_Spy.swift | 1 + Tests/StreamChatTests/ChatClient_Tests.swift | 76 +------ .../ChannelController_Tests.swift | 14 +- .../ChannelListController_Tests.swift | 4 +- .../Repositories/SyncOperations_Tests.swift | 4 +- .../Repositories/SyncRepository_Tests.swift | 188 +++++++++++++----- 20 files changed, 487 insertions(+), 216 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29d56b0b0a8..6f6923d7afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### 🔄 Changed - Made loadBlockedUsers in ConnectedUser public [#3352](https://github.com/GetStream/stream-chat-swift/pull/3352) - Removed Agora and 100ms related code, the recommended way to support calls is to use StreamVideo [#3364](https://github.com/GetStream/stream-chat-swift/pull/3364) +- Run offline state sync in the background instead of pausing other API requests while it runs [#3367](https://github.com/GetStream/stream-chat-swift/pull/3367) # [4.61.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.61.0) _July 30, 2024_ diff --git a/Sources/StreamChat/ChatClient+Environment.swift b/Sources/StreamChat/ChatClient+Environment.swift index 41c46948e39..df00cd15d87 100644 --- a/Sources/StreamChat/ChatClient+Environment.swift +++ b/Sources/StreamChat/ChatClient+Environment.swift @@ -124,8 +124,6 @@ extension ChatClient { var syncRepositoryBuilder: ( _ config: ChatClientConfig, - _ activeChannelControllers: ThreadSafeWeakCollection, - _ activeChannelListControllers: ThreadSafeWeakCollection, _ offlineRequestsRepository: OfflineRequestsRepository, _ eventNotificationCenter: EventNotificationCenter, _ database: DatabaseContainer, @@ -134,13 +132,11 @@ extension ChatClient { ) -> SyncRepository = { SyncRepository( config: $0, - activeChannelControllers: $1, - activeChannelListControllers: $2, - offlineRequestsRepository: $3, - eventNotificationCenter: $4, - database: $5, - apiClient: $6, - channelListUpdater: $7 + offlineRequestsRepository: $1, + eventNotificationCenter: $2, + database: $3, + apiClient: $4, + channelListUpdater: $5 ) } diff --git a/Sources/StreamChat/ChatClient.swift b/Sources/StreamChat/ChatClient.swift index 15a3454657b..84ee8441ecf 100644 --- a/Sources/StreamChat/ChatClient.swift +++ b/Sources/StreamChat/ChatClient.swift @@ -46,10 +46,6 @@ public class ChatClient { /// work if needed (i.e. when a new message pending sent appears in the database, a worker tries to send it.) private(set) var backgroundWorkers: [Worker] = [] - /// Keeps a weak reference to the active channel list controllers to ensure a proper recovery when coming back online - private(set) var activeChannelListControllers = ThreadSafeWeakCollection() - private(set) var activeChannelControllers = ThreadSafeWeakCollection() - /// Background worker that takes care about client connection recovery when the Internet comes back OR app transitions from background to foreground. private(set) var connectionRecoveryHandler: ConnectionRecoveryHandler? @@ -167,8 +163,6 @@ public class ChatClient { ) let syncRepository = environment.syncRepositoryBuilder( config, - activeChannelControllers, - activeChannelListControllers, offlineRequestsRepository, eventNotificationCenter, databaseContainer, @@ -469,8 +463,6 @@ public class ChatClient { authenticationRepository.logOutUser() // Stop tracking active components - activeChannelControllers.removeAllObjects() - activeChannelListControllers.removeAllObjects() syncRepository.removeAllTracked() let group = DispatchGroup() @@ -591,29 +583,6 @@ public class ChatClient { ] } - func startTrackingChannelController(_ channelController: ChatChannelController) { - // If it is already tracking, do nothing. - guard !activeChannelControllers.contains(channelController) else { - return - } - activeChannelControllers.add(channelController) - } - - func stopTrackingChannelController(_ channelController: ChatChannelController) { - activeChannelControllers.remove(channelController) - } - - func startTrackingChannelListController(_ channelListController: ChatChannelListController) { - guard !activeChannelListControllers.contains(channelListController) else { - return - } - activeChannelListControllers.add(channelListController) - } - - func stopTrackingChannelListController(_ channelListController: ChatChannelListController) { - activeChannelListControllers.remove(channelListController) - } - func completeConnectionIdWaiters(connectionId: String?) { connectionRepository.completeConnectionIdWaiters(connectionId: connectionId) } diff --git a/Sources/StreamChat/Config/StreamRuntimeCheck.swift b/Sources/StreamChat/Config/StreamRuntimeCheck.swift index 237b8b55eed..759ae45d8f9 100644 --- a/Sources/StreamChat/Config/StreamRuntimeCheck.swift +++ b/Sources/StreamChat/Config/StreamRuntimeCheck.swift @@ -31,4 +31,9 @@ public enum StreamRuntimeCheck { /// /// Enables reusing unchanged converted items in database observers. public static var _isDatabaseObserverItemReusingEnabled = true + + /// For *internal use* only + /// + /// Uses vresion 2 for offline state sync. + public static var _isSyncV2Enabled = true } diff --git a/Sources/StreamChat/Controllers/ChannelController/ChannelController.swift b/Sources/StreamChat/Controllers/ChannelController/ChannelController.swift index eaf187a2409..3a22e139fdc 100644 --- a/Sources/StreamChat/Controllers/ChannelController/ChannelController.swift +++ b/Sources/StreamChat/Controllers/ChannelController/ChannelController.swift @@ -229,7 +229,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP } override public func synchronize(_ completion: ((_ error: Error?) -> Void)? = nil) { - client.startTrackingChannelController(self) + client.syncRepository.startTrackingChannelController(self) synchronize(isInRecoveryMode: false, completion) } @@ -1030,7 +1030,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP return } - client.startTrackingChannelController(self) + client.syncRepository.startTrackingChannelController(self) updater.startWatching(cid: cid, isInRecoveryMode: isInRecoveryMode) { error in self.state = error.map { .remoteDataFetchFailed(ClientError(with: $0)) } ?? .remoteDataFetched @@ -1064,7 +1064,7 @@ public class ChatChannelController: DataController, DelegateCallable, DataStoreP return } - client.stopTrackingChannelController(self) + client.syncRepository.stopTrackingChannelController(self) updater.stopWatching(cid: cid) { error in self.state = error.map { .remoteDataFetchFailed(ClientError(with: $0)) } ?? .localDataFetched diff --git a/Sources/StreamChat/Controllers/ChannelListController/ChannelListController.swift b/Sources/StreamChat/Controllers/ChannelListController/ChannelListController.swift index a26b92188fd..555c896fe38 100644 --- a/Sources/StreamChat/Controllers/ChannelListController/ChannelListController.swift +++ b/Sources/StreamChat/Controllers/ChannelListController/ChannelListController.swift @@ -144,7 +144,7 @@ public class ChatChannelListController: DataController, DelegateCallable, DataSt override public func synchronize(_ completion: ((_ error: Error?) -> Void)? = nil) { startChannelListObserverIfNeeded() channelListLinker.start(with: client.eventNotificationCenter) - client.startTrackingChannelListController(self) + client.syncRepository.startTrackingChannelListController(self) updateChannelList(completion) } @@ -191,6 +191,11 @@ public class ChatChannelListController: DataController, DelegateCallable, DataSt // MARK: - Internal + func refreshLoadedChannels(completion: @escaping (Result, Error>) -> Void) { + let channelCount = channelListObserver.items.count + worker.refreshLoadedChannels(for: query, channelCount: channelCount, completion: completion) + } + func resetQuery( watchedAndSynchedChannelIds: Set, synchedChannelIds: Set, diff --git a/Sources/StreamChat/Repositories/SyncOperations.swift b/Sources/StreamChat/Repositories/SyncOperations.swift index 57d8d0fac9d..fb1097556a3 100644 --- a/Sources/StreamChat/Repositories/SyncOperations.swift +++ b/Sources/StreamChat/Repositories/SyncOperations.swift @@ -11,14 +11,105 @@ final class SyncContext { var synchedChannelIds: Set = Set() var watchedAndSynchedChannelIds: Set = Set() var unwantedChannelIds: Set = Set() + var missingEventSyncSuccessful = false init(lastSyncAt: Date) { self.lastSyncAt = lastSyncAt } + + var canRefreshChannelLists: Bool { + !missingEventSyncSuccessful + } } private let syncOperationsMaximumRetries = 2 +final class ActiveChannelIdsOperation: AsyncOperation { + init( + syncRepository: SyncRepository, + context: SyncContext + ) { + super.init(maxRetries: syncOperationsMaximumRetries) { [weak syncRepository] _, done in + guard let syncRepository else { + done(.continue) + return + } + + context.localChannelIds.append(contentsOf: syncRepository.activeChannelControllers.allObjects.compactMap(\.cid)) + context.localChannelIds.append(contentsOf: + syncRepository.activeChannelListControllers.allObjects + .map(\.channels) + .flatMap { $0 } + .map(\.cid) + ) + + // Main actor requirement + DispatchQueue.main.async { + context.localChannelIds.append(contentsOf: syncRepository.activeChats.allObjects.compactMap { try? $0.cid }) + context.localChannelIds.append(contentsOf: + syncRepository.activeChannelLists.allObjects + .map(\.state.channels) + .flatMap { $0 } + .map(\.cid) + ) + context.localChannelIds = Array(Set(context.localChannelIds)) + log.info("Found \(context.localChannelIds.count) active channels", subsystems: .offlineSupport) + done(.continue) + } + } + } +} + +final class RefreshChannelListOperation: AsyncOperation { + init(controller: ChatChannelListController, context: SyncContext) { + super.init(maxRetries: syncOperationsMaximumRetries) { [weak controller] _, done in + guard context.canRefreshChannelLists else { + done(.continue) + return + } + guard let controller = controller, controller.canBeRecovered else { + done(.continue) + return + } + controller.refreshLoadedChannels { result in + switch result { + case .success(let channelIds): + log.debug("Synced \(channelIds.count) channels in a channel list controller (\(controller.query.filter)", subsystems: .offlineSupport) + context.synchedChannelIds.formUnion(channelIds) + done(.continue) + case .failure(let error): + log.error("Failed refreshing channel list controller (\(controller.query.filter) with error \(error)", subsystems: .offlineSupport) + done(.retry) + } + } + } + } + + init(channelList: ChannelList, context: SyncContext) { + super.init(maxRetries: syncOperationsMaximumRetries) { [weak channelList] _, done in + guard context.canRefreshChannelLists else { + done(.continue) + return + } + guard let channelList else { + done(.continue) + return + } + Task { + do { + let channelIds = try await channelList.refreshLoadedChannels() + log.debug("Synced \(channelIds.count) channels in a channel list (\(channelList.query.filter)", subsystems: .offlineSupport) + context.synchedChannelIds.formUnion(channelIds) + done(.continue) + } catch { + log.error("Failed refreshing channel list (\(channelList.query.filter) with error \(error)", subsystems: .offlineSupport) + done(.retry) + } + } + } + } +} + final class GetChannelIdsOperation: AsyncOperation { init(database: DatabaseContainer, context: SyncContext, activeChannelIds: [ChannelId]) { super.init(maxRetries: syncOperationsMaximumRetries) { [weak database] _, done in @@ -39,7 +130,7 @@ final class GetChannelIdsOperation: AsyncOperation { } final class SyncEventsOperation: AsyncOperation { - init(syncRepository: SyncRepository, context: SyncContext) { + init(syncRepository: SyncRepository, context: SyncContext, recovery: Bool) { super.init(maxRetries: syncOperationsMaximumRetries) { [weak syncRepository] _, done in log.info( "1. Call `/sync` endpoint and get missing events for all locally existed channels", @@ -49,14 +140,16 @@ final class SyncEventsOperation: AsyncOperation { syncRepository?.syncChannelsEvents( channelIds: context.localChannelIds, lastSyncAt: context.lastSyncAt, - isRecovery: true + isRecovery: recovery ) { result in switch result { case let .success(channelIds): context.synchedChannelIds = Set(channelIds) + context.missingEventSyncSuccessful = true done(.continue) case let .failure(error): context.synchedChannelIds = Set([]) + context.missingEventSyncSuccessful = false done(error.shouldRetry ? .retry : .continue) } } @@ -182,7 +275,7 @@ final class DeleteUnwantedChannelsOperation: AsyncOperation { final class ExecutePendingOfflineActions: AsyncOperation { init(offlineRequestsRepository: OfflineRequestsRepository) { super.init(maxRetries: syncOperationsMaximumRetries) { [weak offlineRequestsRepository] _, done in - log.info("5. Running offline actions requests", subsystems: .offlineSupport) + log.info("Running offline actions requests", subsystems: .offlineSupport) offlineRequestsRepository?.runQueuedRequests { done(.continue) } diff --git a/Sources/StreamChat/Repositories/SyncRepository.swift b/Sources/StreamChat/Repositories/SyncRepository.swift index 328d424bb67..86cc44393ca 100644 --- a/Sources/StreamChat/Repositories/SyncRepository.swift +++ b/Sources/StreamChat/Repositories/SyncRepository.swift @@ -36,25 +36,25 @@ class SyncRepository { private let database: DatabaseContainer private let apiClient: APIClient private let channelListUpdater: ChannelListUpdater - let activeChannelControllers: ThreadSafeWeakCollection - let activeChannelListControllers: ThreadSafeWeakCollection + var usesV2Sync = StreamRuntimeCheck._isSyncV2Enabled let offlineRequestsRepository: OfflineRequestsRepository let eventNotificationCenter: EventNotificationCenter - private var _activeChannelListQueryProviders = [() -> ChannelListQuery?]() - private let syncQueue = DispatchQueue(label: "io.getstream.sync-repository-queue") - + let activeChannelControllers = ThreadSafeWeakCollection() + let activeChannelListControllers = ThreadSafeWeakCollection() + let activeChats = ThreadSafeWeakCollection() + let activeChannelLists = ThreadSafeWeakCollection() + private lazy var operationQueue: OperationQueue = { let operationQueue = OperationQueue() operationQueue.maxConcurrentOperationCount = 1 operationQueue.name = "com.stream.sync-repository" + operationQueue.qualityOfService = .utility return operationQueue }() init( config: ChatClientConfig, - activeChannelControllers: ThreadSafeWeakCollection, - activeChannelListControllers: ThreadSafeWeakCollection, offlineRequestsRepository: OfflineRequestsRepository, eventNotificationCenter: EventNotificationCenter, database: DatabaseContainer, @@ -62,8 +62,6 @@ class SyncRepository { channelListUpdater: ChannelListUpdater ) { self.config = config - self.activeChannelControllers = activeChannelControllers - self.activeChannelListControllers = activeChannelListControllers self.offlineRequestsRepository = offlineRequestsRepository self.channelListUpdater = channelListUpdater self.eventNotificationCenter = eventNotificationCenter @@ -77,20 +75,51 @@ class SyncRepository { // MARK: - Tracking Active - func trackChannelListQuery(_ provider: @escaping () -> ChannelListQuery?) { - syncQueue.sync { - _activeChannelListQueryProviders.append(provider) - } + func startTrackingChat(_ chat: Chat) { + guard !activeChats.contains(chat) else { return } + activeChats.add(chat) + } + + func stopTrackingChat(_ chat: Chat) { + activeChats.remove(chat) + } + + func startTrackingChannelController(_ controller: ChatChannelController) { + guard !activeChannelControllers.contains(controller) else { return } + activeChannelControllers.add(controller) + } + + func stopTrackingChannelController(_ controller: ChatChannelController) { + activeChannelControllers.remove(controller) + } + + func startTrackingChannelList(_ channelList: ChannelList) { + guard !activeChannelLists.contains(channelList) else { return } + activeChannelLists.add(channelList) + } + + func stopTrackingChannelList(_ channelList: ChannelList) { + activeChannelLists.remove(channelList) + } + + func startTrackingChannelListController(_ controller: ChatChannelListController) { + guard !activeChannelListControllers.contains(controller) else { return } + activeChannelListControllers.add(controller) + } + + func stopTrackingChannelListController(_ controller: ChatChannelListController) { + activeChannelListControllers.remove(controller) } func removeAllTracked() { - syncQueue.sync { - _activeChannelListQueryProviders.removeAll() - } + activeChats.removeAllObjects() + activeChannelControllers.removeAllObjects() + activeChannelLists.removeAllObjects() + activeChannelListControllers.removeAllObjects() } - // MARK: - - + // MARK: - Syncing + func syncLocalState(completion: @escaping () -> Void) { cancelRecoveryFlow() @@ -108,10 +137,77 @@ class SyncRepository { } return } - - self?.syncLocalState(lastSyncAt: lastSyncAt, completion: completion) + if self?.usesV2Sync == true { + self?.syncLocalStateV2(lastSyncAt: lastSyncAt, completion: completion) + } else { + self?.syncLocalState(lastSyncAt: lastSyncAt, completion: completion) + } } } + + // MARK: - V2 + + /// Runs offline tasks and updates the local state for channels + /// + /// Recovery mode (pauses regular API requests while it is running) + /// 1. Enter recovery + /// 2. Runs offline API requests + /// 3. Exit recovery + /// + /// Background mode (other regular API requests are allowed to run at the same time) + /// 1. Collect all the **active** channel ids (from instances of `Chat`, `ChannelList`, `ChatChannelController`, `ChatChannelListController`) + /// 2. Ask updates from the /sync endpoint for these channels + /// 2.a Apply changes + /// 2.b When there are too many events, just refresh channel lists (channels for current pages in `ChannelList`, `ChatChannelListController`) + private func syncLocalStateV2(lastSyncAt: Date, completion: @escaping () -> Void) { + let context = SyncContext(lastSyncAt: lastSyncAt) + var operations: [Operation] = [] + let start = CFAbsoluteTimeGetCurrent() + log.info("Starting to refresh offline state", subsystems: .offlineSupport) + + // + // Recovery mode operations (other API requests are paused) + // + if config.isLocalStorageEnabled { + apiClient.enterRecoveryMode() + operations.append(ExecutePendingOfflineActions(offlineRequestsRepository: offlineRequestsRepository)) + operations.append(BlockOperation(block: { [apiClient] in + apiClient.exitRecoveryMode() + })) + } + + // + // Background mode operations + // + + /// 1. Collect all the **active** channel ids + operations.append(ActiveChannelIdsOperation(syncRepository: self, context: context)) + + // 2. /sync + operations.append(SyncEventsOperation(syncRepository: self, context: context, recovery: false)) + + // 2.b.1 (these run only if sync was not possible in the previous step) + operations.append(contentsOf: activeChannelLists.allObjects.map { RefreshChannelListOperation(channelList: $0, context: context) }) + operations.append(contentsOf: activeChannelListControllers.allObjects.map { RefreshChannelListOperation(controller: $0, context: context) }) + + operations.append(BlockOperation(block: { + let duration = CFAbsoluteTimeGetCurrent() - start + log.info("Finished refreshing offline state (\(context.synchedChannelIds.count) channels in \(String(format: "%.1f", duration)) seconds)", subsystems: .offlineSupport) + DispatchQueue.main.async { + completion() + } + })) + + var previousOperation: Operation? + operations.reversed().forEach { operation in + defer { previousOperation = operation } + guard let previousOperation = previousOperation else { return } + previousOperation.addDependency(operation) + } + operationQueue.addOperations(operations, waitUntilFinished: false) + } + + // MARK: - V1 /// Syncs the local state with the server to make sure the local database is up to date. /// It features queuing, serialization and retries @@ -142,7 +238,7 @@ class SyncRepository { operations.append(GetChannelIdsOperation(database: database, context: context, activeChannelIds: activeChannelIds)) // 1. Call `/sync` endpoint and get missing events for all locally existed channels - operations.append(SyncEventsOperation(syncRepository: self, context: context)) + operations.append(SyncEventsOperation(syncRepository: self, context: context, recovery: true)) // 2. Start watching open channels. let watchChannelOperations: [AsyncOperation] = activeChannelControllers.allObjects.map { controller in @@ -162,13 +258,13 @@ class SyncRepository { } operations.append(contentsOf: refetchChannelListQueryOperations) - let channelListQueries = syncQueue.sync { - let queries = _activeChannelListQueryProviders - .compactMap { $0() } + let channelListQueries: [ChannelListQuery] = { + let queries = activeChannelLists.allObjects + .map(\.query) .map { ($0.filter.filterHash, $0) } let uniqueQueries = Dictionary(queries, uniquingKeysWith: { _, last in last }) return Array(uniqueQueries.values) - } + }() operations.append(contentsOf: channelListQueries .map { channelListQuery in RefetchChannelListQueryOperation( @@ -204,11 +300,15 @@ class SyncRepository { /// Syncs the events for the active chat channels using the last sync date. /// - Parameter completion: A block that will get executed upon completion of the synchronization func syncExistingChannelsEvents(completion: @escaping (Result<[ChannelId], SyncError>) -> Void) { - getUser { [weak self] currentUser in + getUser { [weak self, syncCooldown] currentUser in guard let lastSyncAt = currentUser?.lastSynchedEventDate?.bridgeDate else { completion(.failure(.noNeedToSync)) return } + guard Date().timeIntervalSince(lastSyncAt) > syncCooldown else { + completion(.failure(.noNeedToSync)) + return + } self?.getChannelIds { channelIds in self?.syncChannelsEvents( @@ -253,13 +353,6 @@ class SyncRepository { return } - // In recovery mode, `/sync` should always be called. - // Otherwise, the cooldown is checked. - guard isRecovery || Date().timeIntervalSince(lastSyncAt) > syncCooldown else { - completion(.failure(.noNeedToSync)) - return - } - syncMissingEvents( using: lastSyncAt, channelIds: channelIds, diff --git a/Sources/StreamChat/StateLayer/ChannelList.swift b/Sources/StreamChat/StateLayer/ChannelList.swift index 981414f44af..09211151b1c 100644 --- a/Sources/StreamChat/StateLayer/ChannelList.swift +++ b/Sources/StreamChat/StateLayer/ChannelList.swift @@ -7,6 +7,7 @@ import Foundation /// An object which represents a list of `ChatChannel`s for the specified channel query. public class ChannelList { private let channelListUpdater: ChannelListUpdater + private let client: ChatClient private let stateBuilder: StateBuilder let query: ChannelListQuery @@ -16,6 +17,7 @@ public class ChannelList { client: ChatClient, environment: Environment = .init() ) { + self.client = client self.query = query let channelListUpdater = environment.channelListUpdater( client.databaseContainer, @@ -47,6 +49,7 @@ public class ChannelList { public func get() async throws { let pagination = Pagination(pageSize: query.pagination.pageSize) try await loadChannels(with: pagination) + client.syncRepository.startTrackingChannelList(self) } // MARK: - Channel List Pagination @@ -78,6 +81,13 @@ public class ChannelList { loadedChannelsCount: count ) } + + // MARK: - Internal + + func refreshLoadedChannels() async throws -> Set { + let count = await state.channels.count + return try await channelListUpdater.refreshLoadedChannels(for: query, channelCount: count) + } } extension ChannelList { diff --git a/Sources/StreamChat/StateLayer/Chat.swift b/Sources/StreamChat/StateLayer/Chat.swift index febc80c381f..4476f6dcfea 100644 --- a/Sources/StreamChat/StateLayer/Chat.swift +++ b/Sources/StreamChat/StateLayer/Chat.swift @@ -87,6 +87,7 @@ public class Chat { channelQuery: query, memberSorting: state.memberSorting ) + client.syncRepository.startTrackingChat(self) // cid is retrieved from the server when we are creating new channels or there is no local state present guard query.cid != payload.channel.cid else { return } await state.setChannelId(payload.channel.cid) @@ -102,8 +103,8 @@ public class Chat { /// /// - Throws: An error while communicating with the Stream API. public func watch() async throws { - // Note that watching is started in ChatClient+Chat when channel updater's update is called. try await channelUpdater.startWatching(cid: cid, isInRecoveryMode: false) + client.syncRepository.startTrackingChat(self) } /// Stop watching the channel which disables server-side events. @@ -113,6 +114,7 @@ public class Chat { /// - Throws: An error while communicating with the Stream API. public func stopWatching() async throws { try await channelUpdater.stopWatching(cid: cid) + client.syncRepository.stopTrackingChat(self) } // MARK: - Deleting the Channel diff --git a/Sources/StreamChat/StateLayer/ChatClient+Factory.swift b/Sources/StreamChat/StateLayer/ChatClient+Factory.swift index bdf37a1e3c3..11306775106 100644 --- a/Sources/StreamChat/StateLayer/ChatClient+Factory.swift +++ b/Sources/StreamChat/StateLayer/ChatClient+Factory.swift @@ -35,9 +35,7 @@ extension ChatClient { with query: ChannelListQuery, dynamicFilter: ((ChatChannel) -> Bool)? = nil ) -> ChannelList { - let channelList = ChannelList(query: query, dynamicFilter: dynamicFilter, client: self) - syncRepository.trackChannelListQuery { [weak channelList] in channelList?.query } - return channelList + ChannelList(query: query, dynamicFilter: dynamicFilter, client: self) } } diff --git a/Sources/StreamChat/Workers/ChannelListUpdater.swift b/Sources/StreamChat/Workers/ChannelListUpdater.swift index a97c6dd261d..0c68b991878 100644 --- a/Sources/StreamChat/Workers/ChannelListUpdater.swift +++ b/Sources/StreamChat/Workers/ChannelListUpdater.swift @@ -41,6 +41,56 @@ class ChannelListUpdater: Worker { } } + func refreshLoadedChannels(for query: ChannelListQuery, channelCount: Int, completion: @escaping (Result, Error>) -> Void) { + var allPages = [ChannelListQuery]() + for offset in stride(from: 0, to: channelCount, by: .channelsPageSize) { + var pageQuery = query + pageQuery.pagination = Pagination(pageSize: .channelsPageSize, offset: offset) + allPages.append(pageQuery) + } + refreshLoadedChannels(for: allPages, refreshedChannelIds: Set(), completion: completion) + } + + func refreshLoadedChannels(for query: ChannelListQuery, channelCount: Int) async throws -> Set { + try await withCheckedThrowingContinuation { continuation in + refreshLoadedChannels(for: query, channelCount: channelCount) { result in + continuation.resume(with: result) + } + } + } + + private func refreshLoadedChannels(for pageQueries: [ChannelListQuery], refreshedChannelIds: Set, completion: @escaping (Result, Error>) -> Void) { + guard let nextQuery = pageQueries.first else { + completion(.success(refreshedChannelIds)) + return + } + + let remaining = pageQueries.dropFirst() + fetch(channelListQuery: nextQuery) { [weak self] result in + switch result { + case .success(let channelListPayload): + self?.writeChannelListPayload( + payload: channelListPayload, + query: nextQuery, + completion: { writeResult in + switch writeResult { + case .success(let writtenChannels): + self?.refreshLoadedChannels( + for: Array(remaining), + refreshedChannelIds: refreshedChannelIds.union(writtenChannels.map(\.cid)), + completion: completion + ) + case .failure(let error): + completion(.failure(error)) + } + } + ) + case .failure(let error): + completion(.failure(error)) + } + } + } + func resetChannelsQuery( for query: ChannelListQuery, pageSize: Int, diff --git a/TestTools/StreamChatTestTools/Mocks/StreamChat/Controllers/ChatChannelListController_Mock.swift b/TestTools/StreamChatTestTools/Mocks/StreamChat/Controllers/ChatChannelListController_Mock.swift index 489fdec0b67..c2f374a990e 100644 --- a/TestTools/StreamChatTestTools/Mocks/StreamChat/Controllers/ChatChannelListController_Mock.swift +++ b/TestTools/StreamChatTestTools/Mocks/StreamChat/Controllers/ChatChannelListController_Mock.swift @@ -10,6 +10,7 @@ public class ChatChannelListController_Mock: ChatChannelListController, Spy { public var loadNextChannelsIsCalled = false public var loadNextChannelsCallCount = 0 public var resetChannelsQueryResult: Result<(synchedAndWatched: [ChatChannel], unwanted: Set), Error>? + public var refreshLoadedChannelsResult: Result, any Error>? /// Creates a new mock instance of `ChatChannelListController`. public static func mock(client: ChatClient? = nil) -> ChatChannelListController_Mock { @@ -32,6 +33,11 @@ public class ChatChannelListController_Mock: ChatChannelListController, Spy { loadNextChannelsIsCalled = true } + override public func refreshLoadedChannels(completion: @escaping (Result, any Error>) -> Void) { + record() + refreshLoadedChannelsResult.map(completion) + } + override public func resetQuery( watchedAndSynchedChannelIds: Set, synchedChannelIds: Set, diff --git a/TestTools/StreamChatTestTools/Mocks/StreamChat/Repositories/SyncRepository_Mock.swift b/TestTools/StreamChatTestTools/Mocks/StreamChat/Repositories/SyncRepository_Mock.swift index 4c2e0d4637a..33c0d2a7c6d 100644 --- a/TestTools/StreamChatTestTools/Mocks/StreamChat/Repositories/SyncRepository_Mock.swift +++ b/TestTools/StreamChatTestTools/Mocks/StreamChat/Repositories/SyncRepository_Mock.swift @@ -17,8 +17,6 @@ final class SyncRepository_Mock: SyncRepository, Spy { let apiClient = APIClient_Spy() let database = DatabaseContainer_Spy() self.init(config: .init(apiKeyString: ""), - activeChannelControllers: ThreadSafeWeakCollection(), - activeChannelListControllers: ThreadSafeWeakCollection(), offlineRequestsRepository: OfflineRequestsRepository_Mock(), eventNotificationCenter: EventNotificationCenter_Mock(database: database), database: database, @@ -26,8 +24,22 @@ final class SyncRepository_Mock: SyncRepository, Spy { channelListUpdater: ChannelListUpdater_Spy(database: database, apiClient: apiClient)) } - override init(config: ChatClientConfig, activeChannelControllers: ThreadSafeWeakCollection, activeChannelListControllers: ThreadSafeWeakCollection, offlineRequestsRepository: OfflineRequestsRepository, eventNotificationCenter: EventNotificationCenter, database: DatabaseContainer, apiClient: APIClient, channelListUpdater: ChannelListUpdater) { - super.init(config: config, activeChannelControllers: activeChannelControllers, activeChannelListControllers: activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: eventNotificationCenter, database: database, apiClient: apiClient, channelListUpdater: channelListUpdater) + override init( + config: ChatClientConfig, + offlineRequestsRepository: OfflineRequestsRepository, + eventNotificationCenter: EventNotificationCenter, + database: DatabaseContainer, + apiClient: APIClient, + channelListUpdater: ChannelListUpdater + ) { + super.init( + config: config, + offlineRequestsRepository: offlineRequestsRepository, + eventNotificationCenter: eventNotificationCenter, + database: database, + apiClient: apiClient, + channelListUpdater: channelListUpdater + ) } override func syncLocalState(completion: @escaping () -> Void) { diff --git a/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift b/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift index dd6bd131282..96ce09bf784 100644 --- a/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift +++ b/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift @@ -183,6 +183,7 @@ final class APIClient_Spy: APIClient, Spy { @discardableResult func waitForRecoveryRequest(timeout: Double = defaultTimeout) -> AnyEndpoint? { XCTWaiter().wait(for: [recoveryRequest_expectation], timeout: timeout) + recoveryRequest_expectation = XCTestExpectation() return recoveryRequest_endpoint } diff --git a/Tests/StreamChatTests/ChatClient_Tests.swift b/Tests/StreamChatTests/ChatClient_Tests.swift index 07380cb27dc..d7d6172a0b5 100644 --- a/Tests/StreamChatTests/ChatClient_Tests.swift +++ b/Tests/StreamChatTests/ChatClient_Tests.swift @@ -307,11 +307,11 @@ final class ChatClient_Tests: XCTestCase { ) let connectionRepository = try XCTUnwrap(client.connectionRepository as? ConnectionRepository_Mock) connectionRepository.disconnectResult = .success(()) - client.startTrackingChannelController(ChannelControllerSpy()) - client.startTrackingChannelListController(ChatChannelListController_Mock.mock()) + client.syncRepository.startTrackingChannelController(ChannelControllerSpy()) + client.syncRepository.startTrackingChannelListController(ChatChannelListController_Mock.mock()) - XCTAssertEqual(client.activeChannelControllers.count, 1) - XCTAssertEqual(client.activeChannelListControllers.count, 1) + XCTAssertEqual(client.syncRepository.activeChannelControllers.count, 1) + XCTAssertEqual(client.syncRepository.activeChannelListControllers.count, 1) // WHEN let expectation = self.expectation(description: "logout completes") @@ -321,8 +321,8 @@ final class ChatClient_Tests: XCTestCase { waitForExpectations(timeout: defaultTimeout) // THEN - XCTAssertEqual(client.activeChannelControllers.count, 0) - XCTAssertEqual(client.activeChannelListControllers.count, 0) + XCTAssertEqual(client.syncRepository.activeChannelControllers.count, 0) + XCTAssertEqual(client.syncRepository.activeChannelListControllers.count, 0) } func test_apiClient_usesInjectedURLSessionConfiguration() { @@ -824,70 +824,6 @@ final class ChatClient_Tests: XCTestCase { XCTAssertEqual(streamHeader, SystemEnvironment.xStreamClientHeader) } - - // MARK: - Active Controller - - func test_startTrackingChannelController() { - let client = ChatClient(config: .init()) - - let controller = ChatChannelController_Mock.mock() - client.startTrackingChannelController(controller) - - XCTAssertTrue(client.activeChannelControllers.allObjects.first === controller) - } - - func test_startTrackingChannelController_whenAlreadyExists_thenDoNotDuplicate() { - let client = ChatClient(config: .init()) - - let controller = ChatChannelController_Mock.mock() - client.startTrackingChannelController(controller) - client.startTrackingChannelController(controller) - - XCTAssertTrue(client.activeChannelControllers.allObjects.first === controller) - XCTAssertEqual(client.activeChannelControllers.allObjects.count, 1) - } - - func test_stopTrackingChannelController() { - let client = ChatClient(config: .init()) - let controller = ChatChannelController_Mock.mock() - client.startTrackingChannelController(controller) - XCTAssertEqual(client.activeChannelControllers.allObjects.count, 1) - - client.stopTrackingChannelController(controller) - - XCTAssertTrue(client.activeChannelControllers.allObjects.isEmpty) - } - - func test_startTrackingChannelListController() { - let client = ChatClient(config: .init()) - - let controller = ChatChannelListController_Mock.mock() - client.startTrackingChannelListController(controller) - - XCTAssertTrue(client.activeChannelListControllers.allObjects.first === controller) - } - - func test_startTrackingChannelListController_whenAlreadyExists_thenDoNotDuplicate() { - let client = ChatClient(config: .init()) - - let controller = ChatChannelListController_Mock.mock() - client.startTrackingChannelListController(controller) - client.startTrackingChannelListController(controller) - - XCTAssertTrue(client.activeChannelListControllers.allObjects.first === controller) - XCTAssertEqual(client.activeChannelListControllers.allObjects.count, 1) - } - - func test_stopTrackingChannelListController() { - let client = ChatClient(config: .init()) - let controller = ChatChannelListController_Mock.mock() - client.startTrackingChannelListController(controller) - XCTAssertEqual(client.activeChannelListControllers.allObjects.count, 1) - - client.stopTrackingChannelListController(controller) - - XCTAssertTrue(client.activeChannelListControllers.allObjects.isEmpty) - } } final class TestWorker: Worker { diff --git a/Tests/StreamChatTests/Controllers/ChannelController/ChannelController_Tests.swift b/Tests/StreamChatTests/Controllers/ChannelController/ChannelController_Tests.swift index b06c41534a2..4a001fb276e 100644 --- a/Tests/StreamChatTests/Controllers/ChannelController/ChannelController_Tests.swift +++ b/Tests/StreamChatTests/Controllers/ChannelController/ChannelController_Tests.swift @@ -4597,12 +4597,12 @@ final class ChannelController_Tests: XCTestCase { channelListQuery: channelListQuery, client: client ) - XCTAssert(client.activeChannelControllers.allObjects.isEmpty) + XCTAssert(client.syncRepository.activeChannelControllers.allObjects.isEmpty) controller.startWatching(isInRecoveryMode: false) XCTAssert(controller.client === client) - XCTAssert(client.activeChannelControllers.count == 1) - XCTAssert(client.activeChannelControllers.allObjects.first === controller) + XCTAssert(client.syncRepository.activeChannelControllers.count == 1) + XCTAssert(client.syncRepository.activeChannelControllers.allObjects.first === controller) } func test_startWatching_propagatesErrorFromUpdater() { @@ -4783,10 +4783,10 @@ final class ChannelController_Tests: XCTestCase { client: client ) controller.synchronize() - XCTAssert(client.activeChannelControllers.count == 1) + XCTAssert(client.syncRepository.activeChannelControllers.count == 1) controller.stopWatching() - XCTAssert(client.activeChannelControllers.allObjects.isEmpty == true) + XCTAssert(client.syncRepository.activeChannelControllers.allObjects.isEmpty == true) } // MARK: - Freeze channel @@ -5137,8 +5137,8 @@ final class ChannelController_Tests: XCTestCase { controller.synchronize() XCTAssert(controller.client === client) - XCTAssert(client.activeChannelControllers.count == 1) - XCTAssert(client.activeChannelControllers.allObjects.first === controller) + XCTAssert(client.syncRepository.activeChannelControllers.count == 1) + XCTAssert(client.syncRepository.activeChannelControllers.allObjects.first === controller) } // MARK: shouldSendTypingEvents diff --git a/Tests/StreamChatTests/Controllers/ChannelListController/ChannelListController_Tests.swift b/Tests/StreamChatTests/Controllers/ChannelListController/ChannelListController_Tests.swift index 331ef311d39..3fbc63f7155 100644 --- a/Tests/StreamChatTests/Controllers/ChannelListController/ChannelListController_Tests.swift +++ b/Tests/StreamChatTests/Controllers/ChannelListController/ChannelListController_Tests.swift @@ -1014,8 +1014,8 @@ final class ChannelListController_Tests: XCTestCase { controller.synchronize() XCTAssert(controller.client === client) - XCTAssert(client.activeChannelListControllers.count == 1) - XCTAssert(client.activeChannelListControllers.allObjects.first === controller) + XCTAssert(client.syncRepository.activeChannelListControllers.count == 1) + XCTAssert(client.syncRepository.activeChannelListControllers.allObjects.first === controller) } // MARK: Predicates diff --git a/Tests/StreamChatTests/Repositories/SyncOperations_Tests.swift b/Tests/StreamChatTests/Repositories/SyncOperations_Tests.swift index 5e1cb65c45d..913dc998e56 100644 --- a/Tests/StreamChatTests/Repositories/SyncOperations_Tests.swift +++ b/Tests/StreamChatTests/Repositories/SyncOperations_Tests.swift @@ -80,7 +80,7 @@ final class SyncOperations_Tests: XCTestCase { try database.writeSynchronously { session in session.currentUser?.lastSynchedEventDate = originalDate.bridgeDate } - let operation = SyncEventsOperation(syncRepository: syncRepository, context: context) + let operation = SyncEventsOperation(syncRepository: syncRepository, context: context, recovery: false) syncRepository.syncMissingEventsResult = .failure(.syncEndpointFailed(ClientError(""))) operation.startAndWaitForCompletion() @@ -101,7 +101,7 @@ final class SyncOperations_Tests: XCTestCase { session.currentUser?.lastSynchedEventDate = DBDate().addingTimeInterval(-3600) } - let operation = SyncEventsOperation(syncRepository: syncRepository, context: context) + let operation = SyncEventsOperation(syncRepository: syncRepository, context: context, recovery: false) syncRepository.syncMissingEventsResult = .success([.unique, .unique]) operation.startAndWaitForCompletion() diff --git a/Tests/StreamChatTests/Repositories/SyncRepository_Tests.swift b/Tests/StreamChatTests/Repositories/SyncRepository_Tests.swift index 24dd5665896..185c4b1b5d7 100644 --- a/Tests/StreamChatTests/Repositories/SyncRepository_Tests.swift +++ b/Tests/StreamChatTests/Repositories/SyncRepository_Tests.swift @@ -6,9 +6,14 @@ @testable import StreamChatTestTools import XCTest -final class SyncRepository_Tests: XCTestCase { - var _activeChannelControllers: ThreadSafeWeakCollection! - var _activeChannelListControllers: ThreadSafeWeakCollection! +class SyncRepositoryV2_Tests: SyncRepository_Tests { + override func setUp() { + super.setUp() + repository.usesV2Sync = true + } +} + +class SyncRepository_Tests: XCTestCase { var client: ChatClient_Mock! var offlineRequestsRepository: OfflineRequestsRepository_Mock! var database: DatabaseContainer_Spy! @@ -24,8 +29,6 @@ final class SyncRepository_Tests: XCTestCase { override func setUp() { super.setUp() - _activeChannelControllers = ThreadSafeWeakCollection() - _activeChannelListControllers = ThreadSafeWeakCollection() var config = ChatClientConfig(apiKeyString: .unique) config.isLocalStorageEnabled = true client = ChatClient_Mock(config: config) @@ -41,20 +44,17 @@ final class SyncRepository_Tests: XCTestCase { repository = SyncRepository( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: client.eventNotificationCenter, database: database, apiClient: apiClient, channelListUpdater: channelListUpdater ) + repository.usesV2Sync = false } override func tearDown() { super.tearDown() - _activeChannelControllers = nil - _activeChannelListControllers = nil client.cleanUp() client = nil offlineRequestsRepository.clear() @@ -93,8 +93,6 @@ final class SyncRepository_Tests: XCTestCase { let client = ChatClient_Mock(config: config) repository = SyncRepository( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -127,8 +125,6 @@ final class SyncRepository_Tests: XCTestCase { let client = ChatClient_Mock(config: config) repository = SyncRepository( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -171,6 +167,8 @@ final class SyncRepository_Tests: XCTestCase { } func test_syncLocalState_localStorageEnabled_pendingConnectionDate_channels() throws { + try XCTSkipIf(repository.usesV2Sync, "V2 only syncs if there are active controllers") + let channelId = ChannelId.unique try prepareForSyncLocalStorage( createUser: true, @@ -206,7 +204,7 @@ final class SyncRepository_Tests: XCTestCase { let chatController = ChatChannelController_Spy(client: client) chatController.state = .remoteDataFetched - _activeChannelControllers.add(chatController) + repository.startTrackingChannelController(chatController) let eventDate = Date.unique waitForSyncLocalStateRun(requestResult: .success(messageEventPayload(cid: cid, with: [eventDate]))) @@ -216,10 +214,17 @@ final class SyncRepository_Tests: XCTestCase { // Write: API Response, lastSyncAt XCTAssertEqual(database.writeSessionCounter, 2) XCTAssertEqual(repository.activeChannelControllers.count, 1) - XCTAssertCall("recoverWatchedChannel(completion:)", on: chatController, times: 1) + if !repository.usesV2Sync { + XCTAssertCall("recoverWatchedChannel(completion:)", on: chatController, times: 1) + } XCTAssertEqual(repository.activeChannelListControllers.count, 0) - XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 1) - XCTAssertEqual(apiClient.request_allRecordedCalls.count, 0) + if repository.usesV2Sync { + XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 0) + XCTAssertEqual(apiClient.request_allRecordedCalls.count, 1) + } else { + XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 1) + XCTAssertEqual(apiClient.request_allRecordedCalls.count, 0) + } XCTAssertCall("runQueuedRequests(completion:)", on: offlineRequestsRepository, times: 1) } @@ -234,8 +239,13 @@ final class SyncRepository_Tests: XCTestCase { let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client) chatListController.state = .remoteDataFetched - _activeChannelListControllers.add(chatListController) - chatListController.resetChannelsQueryResult = .success(([], [])) + chatListController.channels_mock = [.mock(cid: cid)] + repository.startTrackingChannelListController(chatListController) + if repository.usesV2Sync { + chatListController.refreshLoadedChannelsResult = .success(Set()) + } else { + chatListController.resetChannelsQueryResult = .success(([], [])) + } let eventDate = Date.unique waitForSyncLocalStateRun(requestResult: .success(messageEventPayload(cid: cid, with: [eventDate]))) @@ -246,17 +256,28 @@ final class SyncRepository_Tests: XCTestCase { XCTAssertEqual(database.writeSessionCounter, 2) XCTAssertEqual(repository.activeChannelControllers.count, 0) XCTAssertEqual(repository.activeChannelListControllers.count, 1) - XCTAssertCall( - "resetQuery(watchedAndSynchedChannelIds:synchedChannelIds:completion:)", on: chatListController, - times: 1 - ) - XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 1) - XCTAssertEqual(apiClient.request_allRecordedCalls.count, 0) + if repository.usesV2Sync { + // refresh can only happen when /sync fails + XCTAssertNotCall( + "refreshLoadedChannels(completion:)", on: chatListController + ) + XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 0) + XCTAssertEqual(apiClient.request_allRecordedCalls.count, 1) + } else { + XCTAssertCall( + "resetQuery(watchedAndSynchedChannelIds:synchedChannelIds:completion:)", on: chatListController, + times: 1 + ) + XCTAssertEqual(apiClient.recoveryRequest_allRecordedCalls.count, 1) + XCTAssertEqual(apiClient.request_allRecordedCalls.count, 0) + } XCTAssertCall("runQueuedRequests(completion:)", on: offlineRequestsRepository, times: 1) } func test_syncLocalState_localStorageEnabled_pendingConnectionDate_channels_activeRemoteChannelListController_unwantedChannels( ) throws { + try XCTSkipIf(repository.usesV2Sync, "V2 does not handle unwanted channels") + let cid = ChannelId.unique try prepareForSyncLocalStorage( createUser: true, @@ -267,7 +288,7 @@ final class SyncRepository_Tests: XCTestCase { let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client) chatListController.state = .remoteDataFetched - _activeChannelListControllers.add(chatListController) + repository.startTrackingChannelListController(chatListController) let unwantedId = ChannelId.unique chatListController.resetChannelsQueryResult = .success(([], [unwantedId])) @@ -298,6 +319,14 @@ final class SyncRepository_Tests: XCTestCase { createChannel: true, cid: cid ) + + // At least one active controller is needed for sync to happen + let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client) + if repository.usesV2Sync { + chatListController.state = .remoteDataFetched + chatListController.channels_mock = [.mock(cid: cid)] + repository.startTrackingChannelListController(chatListController) + } let firstDate = lastSyncDate.addingTimeInterval(1) let secondDate = lastSyncDate.addingTimeInterval(2) @@ -311,6 +340,8 @@ final class SyncRepository_Tests: XCTestCase { waitForSyncLocalStateRun(requestResult: .success(eventsPayload2)) XCTAssertNearlySameDate(lastSyncAtValue, thirdDate) + + repository.stopTrackingChannelListController(chatListController) } // MARK: - Sync existing channels events @@ -533,8 +564,6 @@ final class SyncRepository_Tests: XCTestCase { let client = ChatClient_Mock(config: config) repository = SyncRepository( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -561,8 +590,6 @@ final class SyncRepository_Tests: XCTestCase { let client = ChatClient_Mock(config: config) repository = SyncRepository( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -600,8 +627,6 @@ final class SyncRepository_Tests: XCTestCase { // GIVEN let mock = CancelRecoveryFlowTracker( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -625,8 +650,6 @@ final class SyncRepository_Tests: XCTestCase { // GIVEN var mock: CancelRecoveryFlowTracker? = .init( config: client.config, - activeChannelControllers: _activeChannelControllers, - activeChannelListControllers: _activeChannelListControllers, offlineRequestsRepository: offlineRequestsRepository, eventNotificationCenter: repository.eventNotificationCenter, database: database, @@ -658,12 +681,12 @@ final class SyncRepository_Tests: XCTestCase { let channelQuery = ChannelQuery(cid: .unique) let channelController = ChatChannelController(channelQuery: channelQuery, channelListQuery: nil, client: client) channelController.state = .remoteDataFetched - _activeChannelControllers.add(channelController) + repository.startTrackingChannelController(channelController) // Add active channel list component let channelListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client) channelListController.state = .remoteDataFetched - _activeChannelListControllers.add(channelListController) + repository.startTrackingChannelListController(channelListController) // Sync local state var completionCalled = false @@ -672,7 +695,11 @@ final class SyncRepository_Tests: XCTestCase { } // Wait for /sync to be called - AssertAsync.willBeTrue(apiClient.recoveryRequest_completion != nil) + if repository.usesV2Sync { + apiClient.waitForRequest() + } else { + apiClient.waitForRecoveryRequest() + } // Let /sync operation to complete let syncResponse = Result.success(.init(eventPayloads: [])) @@ -680,7 +707,9 @@ final class SyncRepository_Tests: XCTestCase { apiClient.recoveryRequest_completion = nil // Wait for watch operation - AssertAsync.willBeTrue(apiClient.recoveryRequest_completion != nil) + if !repository.usesV2Sync { + AssertAsync.willBeTrue(apiClient.recoveryRequest_completion != nil) + } // Cancel recovery flow repository.cancelRecoveryFlow() @@ -695,6 +724,60 @@ final class SyncRepository_Tests: XCTestCase { Assert.staysFalse(completionCalled) } } + + // MARK: - Tracking + + func test_startTrackingChannelController() { + let controller = ChatChannelController_Mock.mock() + repository.startTrackingChannelController(controller) + + XCTAssertTrue(repository.activeChannelControllers.allObjects.first === controller) + } + + func test_startTrackingChannelController_whenAlreadyExists_thenDoNotDuplicate() { + let controller = ChatChannelController_Mock.mock() + repository.startTrackingChannelController(controller) + repository.startTrackingChannelController(controller) + + XCTAssertTrue(repository.activeChannelControllers.allObjects.first === controller) + XCTAssertEqual(repository.activeChannelControllers.allObjects.count, 1) + } + + func test_stopTrackingChannelController() { + let controller = ChatChannelController_Mock.mock() + repository.startTrackingChannelController(controller) + XCTAssertEqual(repository.activeChannelControllers.allObjects.count, 1) + + repository.stopTrackingChannelController(controller) + + XCTAssertTrue(repository.activeChannelControllers.allObjects.isEmpty) + } + + func test_startTrackingChannelListController() { + let controller = ChatChannelListController_Mock.mock() + repository.startTrackingChannelListController(controller) + + XCTAssertTrue(repository.activeChannelListControllers.allObjects.first === controller) + } + + func test_startTrackingChannelListController_whenAlreadyExists_thenDoNotDuplicate() { + let controller = ChatChannelListController_Mock.mock() + repository.startTrackingChannelListController(controller) + repository.startTrackingChannelListController(controller) + + XCTAssertTrue(repository.activeChannelListControllers.allObjects.first === controller) + XCTAssertEqual(repository.activeChannelListControllers.allObjects.count, 1) + } + + func test_stopTrackingChannelListController() { + let controller = ChatChannelListController_Mock.mock() + repository.startTrackingChannelListController(controller) + XCTAssertEqual(repository.activeChannelListControllers.allObjects.count, 1) + + repository.stopTrackingChannelListController(controller) + + XCTAssertTrue(repository.activeChannelListControllers.allObjects.isEmpty) + } } extension SyncRepository_Tests { @@ -741,21 +824,32 @@ extension SyncRepository_Tests { expectation.fulfill() } - AssertAsync.willBeTrue( - "enterRecoveryMode()".wasCalled(on: apiClient, times: 1) - ) + if !repository.usesV2Sync { + AssertAsync.willBeTrue( + "enterRecoveryMode()".wasCalled(on: apiClient, times: 1) + ) + } if let result = requestResult { // Simulate API Failure - AssertAsync.willBeTrue(apiClient.recoveryRequest_completion != nil) - guard let callback = apiClient.recoveryRequest_completion as? (Result) -> Void else { - XCTFail("A request for /sync should have been executed") - return + if repository.usesV2Sync { + apiClient.waitForRequest() + guard let callback = apiClient.request_completion as? (Result) -> Void else { + XCTFail("A request for /sync should have been executed") + return + } + callback(result) + } else { + apiClient.waitForRecoveryRequest() + guard let callback = apiClient.recoveryRequest_completion as? (Result) -> Void else { + XCTFail("A request for /sync should have been executed") + return + } + callback(result) } - callback(result) } - waitForExpectations(timeout: 10000, handler: nil) + waitForExpectations(timeout: defaultTimeout, handler: nil) XCTAssertCall("exitRecoveryMode()", on: apiClient) }