Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve database container read and reset #3365

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Sources/StreamChat/Database/DTOs/CurrentUserDTO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,16 @@ extension CurrentChatUser {
fileprivate static func create(fromDTO dto: CurrentUserDTO) throws -> CurrentChatUser {
let user = dto.user

let extraData: [String: RawJSON]
do {
extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData)
} catch {
log.error(
"Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. "
+ "Error: \(error)"
)
extraData = [:]
var extraData = [String: RawJSON]()
if !dto.user.extraData.isEmpty {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data is empty, error is logged which can happen when trying to convert deleted DTO

do {
extraData = try JSONDecoder.default.decode([String: RawJSON].self, from: dto.user.extraData)
} catch {
log.error(
"Failed to decode extra data for user with id: <\(dto.user.id)>, using default value instead. "
+ "Error: \(error)"
)
}
}

let mutedUsers: [ChatUser] = try dto.mutedUsers.map { try $0.asModel() }
Expand Down
47 changes: 47 additions & 0 deletions Sources/StreamChat/Database/DTOs/MessageDTO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,53 @@ extension NSManagedObjectContext: MessageDatabaseSession {
$0.localState = .pendingUpload
}
}

func loadMessage(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these under DatabaseSession so that I can use DatabaseSession in async version of the read()

before id: MessageId,
cid: String
) throws -> MessageDTO? {
try MessageDTO.loadMessage(
before: id,
cid: cid,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}

func loadMessages(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in cid: ChannelId,
sortAscending: Bool
) throws -> [MessageDTO] {
try MessageDTO.loadMessages(
from: fromIncludingDate,
to: toIncludingDate,
in: cid,
sortAscending: sortAscending,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}

func loadReplies(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in messageId: MessageId,
sortAscending: Bool
) throws -> [MessageDTO] {
try MessageDTO.loadReplies(
from: fromIncludingDate,
to: toIncludingDate,
in: messageId,
sortAscending: sortAscending,
deletedMessagesVisibility: deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: shouldShowShadowedMessages ?? true,
context: self
)
}
}

extension MessageDTO {
Expand Down
82 changes: 47 additions & 35 deletions Sources/StreamChat/Database/DatabaseContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,29 @@ class DatabaseContainer: NSPersistentContainer {
}
}

func read<T>(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result<T, Error>) -> Void) {
let context = backgroundReadOnlyContext
func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
write(actions) { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

private func read<T>(
from context: NSManagedObjectContext,
_ actions: @escaping (DatabaseSession) throws -> T,
completion: @escaping (Result<T, Error>) -> Void
) {
context.perform {
do {
let changeCounts = context.currentChangeCounts()
let results = try actions(context)
if context.hasChanges {
assertionFailure("Background context is read only, but calling actions() created changes")
if changeCounts != context.currentChangeCounts() {
assertionFailure("Context is read only, but actions created changes: (updated=\(context.updatedObjects), inserted=\(context.insertedObjects), deleted=\(context.deletedObjects)")
Comment on lines +275 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not figure out why deleted message is kept tracked by the context which triggered this assert. Just changed it around so that we only trigger it when actually the actions closure changed something.

}
completion(.success(results))
} catch {
Expand All @@ -268,45 +284,20 @@ class DatabaseContainer: NSPersistentContainer {
}
}

func write(_ actions: @escaping (DatabaseSession) throws -> Void) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
write(actions) { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
func read<T>(_ actions: @escaping (DatabaseSession) throws -> T, completion: @escaping (Result<T, Error>) -> Void) {
read(from: backgroundReadOnlyContext, actions, completion: completion)
}

func read<T>(_ actions: @escaping (NSManagedObjectContext) throws -> T) async throws -> T {
let context = stateLayerContext
return try await withCheckedThrowingContinuation { continuation in
context.perform {
do {
let results = try actions(context)
if context.hasChanges {
assertionFailure("State layer context is read only, but calling actions() created changes")
}
continuation.resume(returning: results)
} catch {
continuation.resume(throwing: error)
}
func read<T>(_ actions: @escaping (DatabaseSession) throws -> T) async throws -> T {
try await withCheckedThrowingContinuation { continuation in
read(from: stateLayerContext, actions) { result in
continuation.resume(with: result)
}
}
}

/// Removes all data from the local storage.
func removeAllData(completion: ((Error?) -> Void)? = nil) {
/// Cleanup the current user cache for all manage object contexts.
allContext.forEach { context in
context.perform {
context.invalidateCurrentUserCache()
context.reset()
}
}

let entityNames = managedObjectModel.entities.compactMap(\.name)
writableContext.perform { [weak self] in
self?.canWriteData = false
Expand All @@ -332,11 +323,24 @@ class DatabaseContainer: NSPersistentContainer {
}
if !deletedObjectIds.isEmpty, let contexts = self?.allContext {
log.debug("Merging \(deletedObjectIds.count) deletions to contexts", subsystems: .database)
// Merging changes triggers DB observers to react to deletions
NSManagedObjectContext.mergeChanges(
fromRemoteContextSave: [NSDeletedObjectsKey: deletedObjectIds],
into: contexts
)
}
// Finally reset states of all the contexts after batch delete and deletion propagation.
if let writableContext = self?.writableContext, let allContext = self?.allContext {
writableContext.invalidateCurrentUserCache()
writableContext.reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you also tried reseting these context when a delete is performed on write function? (Not sure if this will do anything tho)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not test that. Slightly afraid to do it because we use these contexts for DB observers. Thinking about letting it be 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably better 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, I tested it and it works. Maybe if we still have problems with this in the future we can go back here and apply the following change before the CatabaseContainer.write :

if !self.backgroundReadOnlyContext.deletedObjects.isEmpty {
      self.backgroundReadOnlyContext.reset()
 }


for context in allContext where context != writableContext {
context.performAndWait {
context.invalidateCurrentUserCache()
context.reset()
}
}
}
Comment on lines +332 to +343
Copy link
Contributor Author

@laevandus laevandus Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these here because context should be reset after we are done with the batch delete. Otherwise deletes stick around to contexts (because I added NSManagedObjectContext.mergeChanges above). Better to keep them clean when dealing with logout

self?.canWriteData = true
completion?(lastEncounteredError)
}
Expand Down Expand Up @@ -452,6 +456,14 @@ extension NSManagedObjectContext {
deletedObjects.forEach { discardChanges(for: $0) }
}

fileprivate func currentChangeCounts() -> [String: Int] {
[
"inserted": insertedObjects.count,
"updated": updatedObjects.count,
"deleted": deletedObjects.count
]
}

func observeChanges(in otherContext: NSManagedObjectContext) -> NSObjectProtocol {
assert(!automaticallyMergesChangesFromParent, "Duplicate change handling")
return NotificationCenter.default
Expand Down
14 changes: 14 additions & 0 deletions Sources/StreamChat/Database/DatabaseSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ protocol MessageDatabaseSession {
/// to avoid those from being stuck there in limbo.
/// Messages can get stuck in `.sending` state if the network request to send them takes to much, and the app is backgrounded or killed.
func rescueMessagesStuckInSending()

func loadMessages(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in cid: ChannelId,
sortAscending: Bool
) throws -> [MessageDTO]

func loadReplies(
from fromIncludingDate: Date,
to toIncludingDate: Date,
in messageId: MessageId,
sortAscending: Bool
) throws -> [MessageDTO]
}

extension MessageDatabaseSession {
Expand Down
18 changes: 6 additions & 12 deletions Sources/StreamChat/Repositories/MessageRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -333,31 +333,25 @@ class MessageRepository {
extension MessageRepository {
/// Fetches messages from the database with a date range.
func messages(from fromDate: Date, to toDate: Date, in cid: ChannelId) async throws -> [ChatMessage] {
try await database.read { context in
try MessageDTO.loadMessages(
try await database.read { session in
try session.loadMessages(
from: fromDate,
to: toDate,
in: cid,
sortAscending: true,
deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true,
context: context
sortAscending: true
)
.map { try $0.asModel() }
}
}

/// Fetches replies from the database with a date range.
func replies(from fromDate: Date, to toDate: Date, in message: MessageId) async throws -> [ChatMessage] {
try await database.read { context in
try MessageDTO.loadReplies(
try await database.read { session in
try session.loadReplies(
from: fromDate,
to: toDate,
in: message,
sortAscending: true,
deletedMessagesVisibility: context.deletedMessagesVisibility ?? .alwaysVisible,
shouldShowShadowedMessages: context.shouldShowShadowedMessages ?? true,
context: context
sortAscending: true
)
.map { try $0.asModel() }
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/StreamChat/Workers/ChannelUpdater.swift
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,8 @@ extension ChannelUpdater {
}
}
guard let ids = payload.watchers?.map(\.id) else { return [] }
return try await database.read { context in
try ids.compactMap { try UserDTO.load(id: $0, context: context)?.asModel() }
return try await database.read { session in
try ids.compactMap { try session.user(id: $0)?.asModel() }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ class DatabaseSession_Mock: DatabaseSession {
func rescueMessagesStuckInSending() {
underlyingSession.rescueMessagesStuckInSending()
}

func loadMessages(from fromIncludingDate: Date, to toIncludingDate: Date, in cid: ChannelId, sortAscending: Bool) throws -> [MessageDTO] {
try underlyingSession.loadMessages(from: fromIncludingDate, to: toIncludingDate, in: cid, sortAscending: sortAscending)
}

func loadReplies(from fromIncludingDate: Date, to toIncludingDate: Date, in messageId: MessageId, sortAscending: Bool) throws -> [MessageDTO] {
try underlyingSession.loadReplies(from: fromIncludingDate, to: toIncludingDate, in: messageId, sortAscending: sortAscending)
}

func reaction(messageId: MessageId, userId: UserId, type: MessageReactionType) -> MessageReactionDTO? {
underlyingSession.reaction(messageId: messageId, userId: userId, type: type)
Expand Down
Loading