diff --git a/Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift b/Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift index c5261889dc..cf7b0bdadf 100644 --- a/Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift +++ b/Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift @@ -203,15 +203,16 @@ extension CurrentChatUser { fileprivate static func create(fromDTO dto: CurrentUserDTO) throws -> CurrentChatUser { let user = dto.user - let extraData: [String: RawJSON] - do { - extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData) - } catch { - log.error( - "Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. " - + "Error: \(error)" - ) - extraData = [:] + var extraData = [String: RawJSON]() + if !dto.user.extraData.isEmpty { + do { + extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData) + } catch { + log.error( + "Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. " + + "Error: \(error)" + ) + } } let mutedUsers: [ChatUser] = try dto.mutedUsers.map { try $0.asModel() } diff --git a/Sources/StreamChat/Database/DTOs/MessageDTO.swift b/Sources/StreamChat/Database/DTOs/MessageDTO.swift index 81b05497ff..d5dbc4508e 100644 --- a/Sources/StreamChat/Database/DTOs/MessageDTO.swift +++ b/Sources/StreamChat/Database/DTOs/MessageDTO.swift @@ -1154,6 +1154,53 @@ extension NSManagedObjectContext: MessageDatabaseSession { $0.localState = .pendingUpload } } + + func loadMessage( + before id: MessageId, + cid: String + ) throws -> MessageDTO? { + try MessageDTO.loadMessage( + before: id, + cid: cid, + deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible, + shouldShowShadowedMessages: shouldShowShadowedMessages ?? true, + context: self + ) + } + + func loadMessages( + from fromIncludingDate: Date, + to toIncludingDate: Date, + in cid: ChannelId, + sortAscending: Bool + ) throws -> [MessageDTO] { + try MessageDTO.loadMessages( + from: fromIncludingDate, + to: toIncludingDate, + in: cid, + sortAscending: sortAscending, + deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible, + shouldShowShadowedMessages: shouldShowShadowedMessages ?? true, + context: self + ) + } + + func loadReplies( + from fromIncludingDate: Date, + to toIncludingDate: Date, + in messageId: MessageId, + sortAscending: Bool + ) throws -> [MessageDTO] { + try MessageDTO.loadReplies( + from: fromIncludingDate, + to: toIncludingDate, + in: messageId, + sortAscending: sortAscending, + deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible, + shouldShowShadowedMessages: shouldShowShadowedMessages ?? true, + context: self + ) + } } extension MessageDTO { diff --git a/Sources/StreamChat/Database/DatabaseContainer.swift b/Sources/StreamChat/Database/DatabaseContainer.swift index f413fe8031..4fb3ec1fe1 100644 --- a/Sources/StreamChat/Database/DatabaseContainer.swift +++ b/Sources/StreamChat/Database/DatabaseContainer.swift @@ -253,13 +253,29 @@ class DatabaseContainer: NSPersistentContainer { } } - func read(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result) -> Void) { - let context = backgroundReadOnlyContext + func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + write(actions) { error in + if let error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: ()) + } + } + } + } + + private func read( + from context: NSManagedObjectContext, + _ actions: @escaping (DatabaseSession) throws -> T, + completion: @escaping (Result) -> Void + ) { context.perform { do { + let changeCounts = context.currentChangeCounts() let results = try actions(context) - if context.hasChanges { - assertionFailure("Background context is read only, but calling actions() created changes") + if changeCounts != context.currentChangeCounts() { + assertionFailure("Context is read only, but actions created changes: (updated=\(context.updatedObjects), inserted=\(context.insertedObjects), deleted=\(context.deletedObjects)") } completion(.success(results)) } catch { @@ -268,45 +284,20 @@ class DatabaseContainer: NSPersistentContainer { } } - func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - write(actions) { error in - if let error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: ()) - } - } - } + func read(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result) -> Void) { + read(from: backgroundReadOnlyContext, actions, completion: completion) } - func read(_ actions: @escaping (NSManagedObjectContext) throws -> T) async throws -> T { - let context = stateLayerContext - return try await withCheckedThrowingContinuation { continuation in - context.perform { - do { - let results = try actions(context) - if context.hasChanges { - assertionFailure("State layer context is read only, but calling actions() created changes") - } - continuation.resume(returning: results) - } catch { - continuation.resume(throwing: error) - } + func read(_ actions: @escaping (DatabaseSession) throws -> T) async throws -> T { + try await withCheckedThrowingContinuation { continuation in + read(from: stateLayerContext, actions) { result in + continuation.resume(with: result) } } } /// Removes all data from the local storage. func removeAllData(completion: ((Error?) -> Void)? = nil) { - /// Cleanup the current user cache for all manage object contexts. - allContext.forEach { context in - context.perform { - context.invalidateCurrentUserCache() - context.reset() - } - } - let entityNames = managedObjectModel.entities.compactMap(\.name) writableContext.perform { [weak self] in self?.canWriteData = false @@ -332,11 +323,24 @@ class DatabaseContainer: NSPersistentContainer { } if !deletedObjectIds.isEmpty, let contexts = self?.allContext { log.debug("Merging \(deletedObjectIds.count) deletions to contexts", subsystems: .database) + // Merging changes triggers DB observers to react to deletions NSManagedObjectContext.mergeChanges( fromRemoteContextSave: [NSDeletedObjectsKey: deletedObjectIds], into: contexts ) } + // Finally reset states of all the contexts after batch delete and deletion propagation. + if let writableContext = self?.writableContext, let allContext = self?.allContext { + writableContext.invalidateCurrentUserCache() + writableContext.reset() + + for context in allContext where context != writableContext { + context.performAndWait { + context.invalidateCurrentUserCache() + context.reset() + } + } + } self?.canWriteData = true completion?(lastEncounteredError) } @@ -452,6 +456,14 @@ extension NSManagedObjectContext { deletedObjects.forEach { discardChanges(for: $0) } } + fileprivate func currentChangeCounts() -> [String: Int] { + [ + "inserted": insertedObjects.count, + "updated": updatedObjects.count, + "deleted": deletedObjects.count + ] + } + func observeChanges(in otherContext: NSManagedObjectContext) -> NSObjectProtocol { assert(!automaticallyMergesChangesFromParent, "Duplicate change handling") return NotificationCenter.default diff --git a/Sources/StreamChat/Database/DatabaseSession.swift b/Sources/StreamChat/Database/DatabaseSession.swift index 6fcc640199..fb9c26b08f 100644 --- a/Sources/StreamChat/Database/DatabaseSession.swift +++ b/Sources/StreamChat/Database/DatabaseSession.swift @@ -194,6 +194,20 @@ protocol MessageDatabaseSession { /// to avoid those from being stuck there in limbo. /// Messages can get stuck in `.sending` state if the network request to send them takes to much, and the app is backgrounded or killed. func rescueMessagesStuckInSending() + + func loadMessages( + from fromIncludingDate: Date, + to toIncludingDate: Date, + in cid: ChannelId, + sortAscending: Bool + ) throws -> [MessageDTO] + + func loadReplies( + from fromIncludingDate: Date, + to toIncludingDate: Date, + in messageId: MessageId, + sortAscending: Bool + ) throws -> [MessageDTO] } extension MessageDatabaseSession { diff --git a/Sources/StreamChat/Repositories/MessageRepository.swift b/Sources/StreamChat/Repositories/MessageRepository.swift index 60f9046a04..2d0ea7979d 100644 --- a/Sources/StreamChat/Repositories/MessageRepository.swift +++ b/Sources/StreamChat/Repositories/MessageRepository.swift @@ -333,15 +333,12 @@ class MessageRepository { extension MessageRepository { /// Fetches messages from the database with a date range. func messages(from fromDate: Date, to toDate: Date, in cid: ChannelId) async throws -> [ChatMessage] { - try await database.read { context in - try MessageDTO.loadMessages( + try await database.read { session in + try session.loadMessages( from: fromDate, to: toDate, in: cid, - sortAscending: true, - deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible, - shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true, - context: context + sortAscending: true ) .map { try $0.asModel() } } @@ -349,15 +346,12 @@ extension MessageRepository { /// Fetches replies from the database with a date range. func replies(from fromDate: Date, to toDate: Date, in message: MessageId) async throws -> [ChatMessage] { - try await database.read { context in - try MessageDTO.loadReplies( + try await database.read { session in + try session.loadReplies( from: fromDate, to: toDate, in: message, - sortAscending: true, - deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible, - shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true, - context: context + sortAscending: true ) .map { try $0.asModel() } } diff --git a/Sources/StreamChat/Workers/ChannelUpdater.swift b/Sources/StreamChat/Workers/ChannelUpdater.swift index 2394dc0dec..6417555271 100644 --- a/Sources/StreamChat/Workers/ChannelUpdater.swift +++ b/Sources/StreamChat/Workers/ChannelUpdater.swift @@ -718,8 +718,8 @@ extension ChannelUpdater { } } guard let ids = payload.watchers?.map(\.id) else { return [] } - return try await database.read { context in - try ids.compactMap { try UserDTO.load(id: $0, context: context)?.asModel() } + return try await database.read { session in + try ids.compactMap { try session.user(id: $0)?.asModel() } } } diff --git a/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift b/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift index 9a04f23457..a32ac46c0d 100644 --- a/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift +++ b/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift @@ -215,6 +215,14 @@ class DatabaseSession_Mock: DatabaseSession { func rescueMessagesStuckInSending() { underlyingSession.rescueMessagesStuckInSending() } + + func loadMessages(from fromIncludingDate: Date, to toIncludingDate: Date, in cid: ChannelId, sortAscending: Bool) throws -> [MessageDTO] { + try underlyingSession.loadMessages(from: fromIncludingDate, to: toIncludingDate, in: cid, sortAscending: sortAscending) + } + + func loadReplies(from fromIncludingDate: Date, to toIncludingDate: Date, in messageId: MessageId, sortAscending: Bool) throws -> [MessageDTO] { + try underlyingSession.loadReplies(from: fromIncludingDate, to: toIncludingDate, in: messageId, sortAscending: sortAscending) + } func reaction(messageId: MessageId, userId: UserId, type: MessageReactionType) -> MessageReactionDTO? { underlyingSession.reaction(messageId: messageId, userId: userId, type: type)