Skip to content

Commit

Permalink
Use DatabaseSession type in both database read wrappers. Fix an issue…
Browse files Browse the repository at this point in the history
… where on logout contexts are referencing DTOs
  • Loading branch information
laevandus committed Aug 7, 2024
1 parent c6bc553 commit befe2cf
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 58 deletions.
19 changes: 10 additions & 9 deletions Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,16 @@ extension CurrentChatUser {
fileprivate static func create(fromDTO dto: CurrentUserDTO) throws -> CurrentChatUser {
let user = dto.user

let extraData: [String: RawJSON]
do {
extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData)
} catch {
log.error(
"Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. "
+ "Error: \(error)"
)
extraData = [:]
var extraData = [String: RawJSON]()
if !dto.user.extraData.isEmpty {
do {
extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData)
} catch {
log.error(
"Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. "
+ "Error: \(error)"
)
}
}

let mutedUsers: [ChatUser] = try dto.mutedUsers.map { try $0.asModel() }
Expand Down
47 changes: 47 additions & 0 deletions Sources/StreamChat/Database/DTOs/MessageDTO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,53 @@ extension NSManagedObjectContext: MessageDatabaseSession {
$0.localState = .pendingUpload
}
}

func loadMessage(
before id: MessageId,
cid: String
) throws -> MessageDTO? {
try MessageDTO.loadMessage(
before: id,
cid: cid,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}

func loadMessages(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in cid: ChannelId,
sortAscending: Bool
) throws -> [MessageDTO] {
try MessageDTO.loadMessages(
from: fromIncludingDate,
to: toIncludingDate,
in: cid,
sortAscending: sortAscending,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}

func loadReplies(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in messageId: MessageId,
sortAscending: Bool
) throws -> [MessageDTO] {
try MessageDTO.loadReplies(
from: fromIncludingDate,
to: toIncludingDate,
in: messageId,
sortAscending: sortAscending,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}
}

extension MessageDTO {
Expand Down
82 changes: 47 additions & 35 deletions Sources/StreamChat/Database/DatabaseContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,29 @@ class DatabaseContainer: NSPersistentContainer {
}
}

func read<T>(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result<T, Error>) -> Void) {
let context = backgroundReadOnlyContext
func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
write(actions) { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

private func read<T>(
from context: NSManagedObjectContext,
_ actions: @escaping (DatabaseSession) throws -> T,
completion: @escaping (Result<T, Error>) -> Void
) {
context.perform {
do {
let changeCounts = context.currentChangeCounts()
let results = try actions(context)
if context.hasChanges {
assertionFailure("Background context is read only, but calling actions() created changes")
if changeCounts != context.currentChangeCounts() {
assertionFailure("Context is read only, but actions created changes: (updated=\(context.updatedObjects), inserted=\(context.insertedObjects), deleted=\(context.deletedObjects)")
}
completion(.success(results))
} catch {
Expand All @@ -268,45 +284,20 @@ class DatabaseContainer: NSPersistentContainer {
}
}

func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
write(actions) { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
func read<T>(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result<T, Error>) -> Void) {
read(from: backgroundReadOnlyContext, actions, completion: completion)
}

func read<T>(_ actions: @escaping (NSManagedObjectContext) throws -> T) async throws -> T {
let context = stateLayerContext
return try await withCheckedThrowingContinuation { continuation in
context.perform {
do {
let results = try actions(context)
if context.hasChanges {
assertionFailure("State layer context is read only, but calling actions() created changes")
}
continuation.resume(returning: results)
} catch {
continuation.resume(throwing: error)
}
func read<T>(_ actions: @escaping (DatabaseSession) throws -> T) async throws -> T {
try await withCheckedThrowingContinuation { continuation in
read(from: stateLayerContext, actions) { result in
continuation.resume(with: result)
}
}
}

/// Removes all data from the local storage.
func removeAllData(completion: ((Error?) -> Void)? = nil) {
/// Cleanup the current user cache for all manage object contexts.
allContext.forEach { context in
context.perform {
context.invalidateCurrentUserCache()
context.reset()
}
}

let entityNames = managedObjectModel.entities.compactMap(\.name)
writableContext.perform { [weak self] in
self?.canWriteData = false
Expand All @@ -332,11 +323,24 @@ class DatabaseContainer: NSPersistentContainer {
}
if !deletedObjectIds.isEmpty, let contexts = self?.allContext {
log.debug("Merging \(deletedObjectIds.count) deletions to contexts", subsystems: .database)
// Merging changes triggers DB observers to react to deletions
NSManagedObjectContext.mergeChanges(
fromRemoteContextSave: [NSDeletedObjectsKey: deletedObjectIds],
into: contexts
)
}
// Finally reset states of all the contexts after batch delete and deletion propagation.
if let writableContext = self?.writableContext, let allContext = self?.allContext {
writableContext.invalidateCurrentUserCache()
writableContext.reset()

for context in allContext where context != writableContext {
context.performAndWait {
context.invalidateCurrentUserCache()
context.reset()
}
}
}
self?.canWriteData = true
completion?(lastEncounteredError)
}
Expand Down Expand Up @@ -452,6 +456,14 @@ extension NSManagedObjectContext {
deletedObjects.forEach { discardChanges(for: $0) }
}

fileprivate func currentChangeCounts() -> [String: Int] {
[
"inserted": insertedObjects.count,
"updated": updatedObjects.count,
"deleted": deletedObjects.count
]
}

func observeChanges(in otherContext: NSManagedObjectContext) -> NSObjectProtocol {
assert(!automaticallyMergesChangesFromParent, "Duplicate change handling")
return NotificationCenter.default
Expand Down
14 changes: 14 additions & 0 deletions Sources/StreamChat/Database/DatabaseSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ protocol MessageDatabaseSession {
/// to avoid those from being stuck there in limbo.
/// Messages can get stuck in `.sending` state if the network request to send them takes to much, and the app is backgrounded or killed.
func rescueMessagesStuckInSending()

func loadMessages(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in cid: ChannelId,
sortAscending: Bool
) throws -> [MessageDTO]

func loadReplies(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in messageId: MessageId,
sortAscending: Bool
) throws -> [MessageDTO]
}

extension MessageDatabaseSession {
Expand Down
18 changes: 6 additions & 12 deletions Sources/StreamChat/Repositories/MessageRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -333,31 +333,25 @@ class MessageRepository {
extension MessageRepository {
/// Fetches messages from the database with a date range.
func messages(from fromDate: Date, to toDate: Date, in cid: ChannelId) async throws -> [ChatMessage] {
try await database.read { context in
try MessageDTO.loadMessages(
try await database.read { session in
try session.loadMessages(
from: fromDate,
to: toDate,
in: cid,
sortAscending: true,
deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true,
context: context
sortAscending: true
)
.map { try $0.asModel() }
}
}

/// Fetches replies from the database with a date range.
func replies(from fromDate: Date, to toDate: Date, in message: MessageId) async throws -> [ChatMessage] {
try await database.read { context in
try MessageDTO.loadReplies(
try await database.read { session in
try session.loadReplies(
from: fromDate,
to: toDate,
in: message,
sortAscending: true,
deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true,
context: context
sortAscending: true
)
.map { try $0.asModel() }
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/StreamChat/Workers/ChannelUpdater.swift
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,8 @@ extension ChannelUpdater {
}
}
guard let ids = payload.watchers?.map(\.id) else { return [] }
return try await database.read { context in
try ids.compactMap { try UserDTO.load(id: $0, context: context)?.asModel() }
return try await database.read { session in
try ids.compactMap { try session.user(id: $0)?.asModel() }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ class DatabaseSession_Mock: DatabaseSession {
func rescueMessagesStuckInSending() {
underlyingSession.rescueMessagesStuckInSending()
}

func loadMessages(from fromIncludingDate: Date, to toIncludingDate: Date, in cid: ChannelId, sortAscending: Bool) throws -> [MessageDTO] {
try underlyingSession.loadMessages(from: fromIncludingDate, to: toIncludingDate, in: cid, sortAscending: sortAscending)
}

func loadReplies(from fromIncludingDate: Date, to toIncludingDate: Date, in messageId: MessageId, sortAscending: Bool) throws -> [MessageDTO] {
try underlyingSession.loadReplies(from: fromIncludingDate, to: toIncludingDate, in: messageId, sortAscending: sortAscending)
}

func reaction(messageId: MessageId, userId: UserId, type: MessageReactionType) -> MessageReactionDTO? {
underlyingSession.reaction(messageId: messageId, userId: userId, type: type)
Expand Down

0 comments on commit befe2cf

Please sign in to comment.