Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reconnection timeout handler not working in the token provider phase #3513

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## StreamChat
### 🐞 Fixed
- Fix a rare infinite loop triggering a crash when handling database changes [#3508](https://github.com/GetStream/stream-chat-swift/pull/3508)
- Fix reconnection timeout handler not working in the token provider phase [#3513](https://github.com/GetStream/stream-chat-swift/pull/3513)

# [4.67.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.67.0)
_November 25, 2024_
Expand Down
11 changes: 7 additions & 4 deletions Sources/StreamChat/ChatClient+Environment.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ extension ChatClient {
)
}

var reconnectionHandlerBuilder: (_ chatClientConfig: ChatClientConfig) -> StreamTimer? = {
guard let reconnectionTimeout = $0.reconnectionTimeout else { return nil }
return ScheduledStreamTimer(interval: reconnectionTimeout, fireOnStart: false, repeats: false)
}

var extensionLifecycleBuilder = NotificationExtensionLifecycle.init

var requestEncoderBuilder: (_ baseURL: URL, _ apiKey: APIKey) -> RequestEncoder = DefaultRequestEncoder.init
Expand Down Expand Up @@ -97,8 +102,7 @@ extension ChatClient {
_ extensionLifecycle: NotificationExtensionLifecycle,
_ backgroundTaskScheduler: BackgroundTaskScheduler?,
_ internetConnection: InternetConnection,
_ keepConnectionAliveInBackground: Bool,
_ reconnectionTimeoutHandler: StreamTimer?
_ keepConnectionAliveInBackground: Bool
) -> ConnectionRecoveryHandler = {
DefaultConnectionRecoveryHandler(
webSocketClient: $0,
Expand All @@ -109,8 +113,7 @@ extension ChatClient {
internetConnection: $5,
reconnectionStrategy: DefaultRetryStrategy(),
reconnectionTimerType: DefaultTimer.self,
keepConnectionAliveInBackground: $6,
reconnectionTimeoutHandler: $7
keepConnectionAliveInBackground: $6
)
}

Expand Down
48 changes: 45 additions & 3 deletions Sources/StreamChat/ChatClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class ChatClient {
/// Used as a bridge to communicate between the host app and the notification extension. Holds the state for the app lifecycle.
let extensionLifecycle: NotificationExtensionLifecycle

/// The component responsible to timeout the user connection if it takes more time than the `ChatClientConfig.reconnectionTimeout`.
var reconnectionTimeoutHandler: StreamTimer?

/// The environment object containing all dependencies of this `Client` instance.
private let environment: Environment

Expand Down Expand Up @@ -219,12 +222,18 @@ public class ChatClient {
setupOfflineRequestQueue()
setupConnectionRecoveryHandler(with: environment)
validateIntegrity()

reconnectionTimeoutHandler = environment.reconnectionHandlerBuilder(config)
reconnectionTimeoutHandler?.onChange = { [weak self] in
self?.timeout()
}
}

deinit {
Self._activeLocalStorageURLs.mutate { $0.subtract(databaseContainer.persistentStoreDescriptions.compactMap(\.url)) }
completeConnectionIdWaiters(connectionId: nil)
completeTokenWaiters(token: nil)
reconnectionTimeoutHandler?.stop()
}

func setupTokenRefresher() {
Expand Down Expand Up @@ -254,8 +263,7 @@ public class ChatClient {
extensionLifecycle,
environment.backgroundTaskSchedulerBuilder(),
environment.internetConnection(eventNotificationCenter, environment.internetMonitor),
config.staysConnectedInBackground,
config.reconnectionTimeout.map { ScheduledStreamTimer(interval: $0, fireOnStart: false, repeats: false) }
config.staysConnectedInBackground
)
}

Expand Down Expand Up @@ -300,7 +308,9 @@ public class ChatClient {
tokenProvider: @escaping TokenProvider,
completion: ((Error?) -> Void)? = nil
) {
reconnectionTimeoutHandler?.start()
connectionRecoveryHandler?.start()
connectionRepository.initialize()

authenticationRepository.connectUser(
userInfo: userInfo,
Expand Down Expand Up @@ -393,7 +403,9 @@ public class ChatClient {
userInfo: UserInfo,
completion: ((Error?) -> Void)? = nil
) {
connectionRepository.initialize()
connectionRecoveryHandler?.start()
reconnectionTimeoutHandler?.start()
authenticationRepository.connectGuestUser(userInfo: userInfo, completion: { completion?($0) })
}

Expand All @@ -417,6 +429,8 @@ public class ChatClient {
/// Connects an anonymous user
/// - Parameter completion: The completion that will be called once the **first** user session for the given token is setup.
public func connectAnonymousUser(completion: ((Error?) -> Void)? = nil) {
connectionRepository.initialize()
reconnectionTimeoutHandler?.start()
connectionRecoveryHandler?.start()
authenticationRepository.connectAnonymousUser(
completion: { completion?($0) }
Expand Down Expand Up @@ -458,7 +472,7 @@ public class ChatClient {
completion()
}
authenticationRepository.clearTokenProvider()
authenticationRepository.cancelTimers()
authenticationRepository.reset()
Comment on lines 474 to +475
Copy link
Contributor

@laevandus laevandus Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should reconnectionTimeoutHandler?.stop() be called here as well or it does not matter?

Thinking about the case of calling disconnect quickly after calling connect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not really matter, because this is triggered by the reconnectionTimeoutHandler which has repeats: false, so it will be already stopped by the time it reaches here

}

/// Disconnects the chat client from the chat servers. No further updates from the servers
Expand Down Expand Up @@ -617,6 +631,15 @@ public class ChatClient {
completion?($0)
}
}

private func timeout() {
completeConnectionIdWaiters(connectionId: nil)
authenticationRepository.completeTokenCompletions(error: ClientError.ReconnectionTimeout())
completeTokenWaiters(token: nil)
authenticationRepository.reset()
let webSocketConnectionState = webSocketClient?.connectionState ?? .initialized
connectionRepository.disconnect(source: .timeout(from: webSocketConnectionState)) {}
}
}

extension ChatClient: AuthenticationRepositoryDelegate {
Expand Down Expand Up @@ -646,6 +669,17 @@ extension ChatClient: ConnectionStateDelegate {
)
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
try? backgroundWorker(of: MessageSender.self).didUpdateConnectionState(state)

switch state {
case .connecting:
if reconnectionTimeoutHandler?.isRunning == false {
reconnectionTimeoutHandler?.start()
}
case .connected:
reconnectionTimeoutHandler?.stop()
default:
break
}
}
}

Expand Down Expand Up @@ -692,6 +726,14 @@ extension ClientError {
}
}

public final class ReconnectionTimeout: ClientError {
override public var localizedDescription: String {
"""
The reconnection process has timed out after surpassing the value from `ChatClientConfig.reconnectionTimeout`.
"""
}
}

public final class MissingToken: ClientError {}
final class WaiterTimeout: ClientError {}

Expand Down
29 changes: 18 additions & 11 deletions Sources/StreamChat/Repositories/AuthenticationRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ class AuthenticationRepository {
isGettingToken = false
}

func cancelTimers() {
func reset() {
connectionProviderTimer?.cancel()
tokenProviderTimer?.cancel()
tokenQueue.async(flags: .barrier) {
self._tokenExpirationRetryStrategy.resetConsecutiveFailures()
}
}

func logOutUser() {
Expand Down Expand Up @@ -280,6 +283,19 @@ class AuthenticationRepository {
updateToken(token: token, notifyTokenWaiters: true)
}

func completeTokenCompletions(error: Error?) {
let completionBlocks: [(Error?) -> Void]? = tokenQueue.sync(flags: .barrier) {
self._isGettingToken = false
let completions = self._tokenRequestCompletions
return completions
}
completionBlocks?.forEach { $0(error) }
tokenQueue.async(flags: .barrier) {
self._tokenRequestCompletions = []
self._consecutiveRefreshFailures = 0
}
}

private func updateToken(token: Token?, notifyTokenWaiters: Bool) {
let waiters: [String: (Result<Token, Error>) -> Void] = tokenQueue.sync(flags: .barrier) {
_currentToken = token
Expand Down Expand Up @@ -331,21 +347,12 @@ class AuthenticationRepository {
isGettingToken = true

let onCompletion: (Error?) -> Void = { [weak self] error in
guard let self = self else { return }
if let error = error {
log.error("Error when getting token: \(error)", subsystems: .authentication)
} else {
log.debug("Successfully retrieved token", subsystems: .authentication)
}

let completionBlocks: [(Error?) -> Void]? = self.tokenQueue.sync(flags: .barrier) {
self._isGettingToken = false
let completions = self._tokenRequestCompletions
self._tokenRequestCompletions = []
self._consecutiveRefreshFailures = 0
return completions
}
completionBlocks?.forEach { $0(error) }
self?.completeTokenCompletions(error: error)
}

guard consecutiveRefreshFailures < Constants.maximumTokenRefreshAttempts else {
Expand Down
12 changes: 4 additions & 8 deletions Sources/StreamChat/Repositories/ConnectionRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class ConnectionRepository {
self.timerType = timerType
}

func initialize() {
webSocketClient?.initialize()
}
Comment on lines +45 to +47
Copy link
Member Author

@nuno-vieira nuno-vieira Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be introduced. Otherwise, we won't report the status of the new connection to the delegates. This is especially important when using ChatClient as a singleton. This is the problem:

  1. connectUser() (connectionStatus = .initialized) - keep in mind that it only goes to .connecting, after the token provider finishes
  2. timeout() (connectinStatus = .disconnected) - Disconnected status is reported ✅
  3. connectUser() again (connectionStatus == .disconnected) - Connecting status is not reported ❌ , because the status did not change

Since we don't reset the connection status, it keeps at the .disconnected, and so the .connecting or .initialized is not reported when manually reconnecting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you call connectUser again, shouldn't it move to initialized / connecting already? Because there should be a status change when you start connecting again.

Copy link
Member Author

@nuno-vieira nuno-vieira Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more details to point 1.. The problem here is that we only set connectionState = .connecting after the token phase finishes. So, if the token provider keeps failing, we never set the websocket to connecting and it will be kept at disconnected on the second try, which means we won't report any connection state change, since it will be stuck at disconnected.

This approach actually makes sense, before the connect() call is made to the WebSocket engine, the state is initialized. So, when the token provider fails and we did not connect the websocket engine, we need to restart the state to initialized. (Since the timeout action will report disconnect)

Copy link
Member Author

@nuno-vieira nuno-vieira Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall overview:

stateDiagram-v2
    [*] --> initialize
    initialize --> ChatClient.Connect
    ChatClient.Connect --> TokenProvider
    
    TokenProvider --> ReconnectionTimeout: Fails
    TokenProvider --> WebSocket.Connect: Success
    
     ReconnectionTimeout --> disconnected
     disconnected --> ChatClient.Connect
    
    note right of WebSocket.Connect
        State reported: .connecting
    end note
    
    note right of initialize
        State reported: .initialize
    end note
    
    note right of disconnected
        State reported: .disconnected
    end note
    
    note left of disconnected
        On the second cycle, the state is not reported
        Because it did not change from .disconnected
    end note
Loading

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it would be nice to see .connecting as the state just after calling connect(). As a SDK user, who does not know internals, that would make sense. That said, it might make sense to do this separately (because we have this distinction at the moment where .initialized is used during the token fetch phase).

Especially because documentation says (one typo there):

    /// The initial state meaning that  there was no atempt to connect yet.
    case initialized

That is confusing for the SDK user.

My view is that I feel like it is OK in the PR to go for .initialized, because this is what how the status has been reported previously, but then in a follow-up PR actually change this to .connecting because that would make more sense for SDK users (thinking about splitting because no idea what side-effects it could have and that should be tested throughly).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly, I had the same thinking as Toomas. Basically, this part:
"The problem here is that we only set connectionState = .connecting after the token phase finishes.", doesn't feel correct. We are connecting (we are trying to get a token), but that's not reflected anywhere. To reduce risks, I'm fine to merge it like this as well, but yeah, we should make this more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, changing this would be very risky at the moment, that is why I decided not to do, and I remember I had a couple of issues trying it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laevandus I changed that comment since it is internal either way. I tried changing the state to .connecting but this breaks other stuff, and so for now I prefer to not risk it. At most, adding another internal state, like .fetchingToken or something like this, would be better and not cause any impact on the rest of the logic. But for now, I think this is more than enough 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we should try to tackle in another issue & PR.


/// Connects the chat client the controller represents to the chat servers.
///
/// When the connection is established, `ChatClient` starts receiving chat updates, and `currentUser` variable is available.
Expand Down Expand Up @@ -95,14 +99,6 @@ class ConnectionRepository {
return
}

if connectionId == nil {
if source == .userInitiated {
log.warning("The client is already disconnected. Skipping the `disconnect` call.")
}
completion()
return
}

Comment on lines -98 to -105
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unnecessary optimization that just complicates things. For example, when we timeout, we don't have the connectionId, so the webSocketClient? .disconnect won't be called. Which means no connection status changes will be reported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this makes sense.

// Disconnect the web socket
webSocketClient?.disconnect(source: source) { [weak self] in
// Reset `connectionId`. This would happen asynchronously by the callback from WebSocketClient anyway, but it's
Expand Down
23 changes: 11 additions & 12 deletions Sources/StreamChat/WebSocketClient/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class WebSocketClient {
self.eventNotificationCenter = eventNotificationCenter
}

func initialize() {
connectionState = .initialized
}

/// Connects the web connect.
///
/// Calling this method has no effect is the web socket is already connected, or is in the connecting phase.
Expand Down Expand Up @@ -137,23 +141,18 @@ class WebSocketClient {
source: WebSocketConnectionState.DisconnectionSource = .userInitiated,
completion: @escaping () -> Void
) {
connectionState = .disconnecting(source: source)
engineQueue.async { [engine, eventsBatcher] in
engine?.disconnect()

eventsBatcher.processImmediately(completion: completion)
switch connectionState {
case .initialized, .disconnected, .disconnecting:
connectionState = .disconnected(source: source)
case .connecting, .waitingForConnectionId, .connected:
connectionState = .disconnecting(source: source)
Comment on lines +144 to +148
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when disconnecting, if we are already disconnected, we should not put the state to disconnecting since the engine won't do anything, and so it won't report the disconnected afterwards. For this reason, we need to instantly report as disconnected for this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems tricky, previously it seems we were also processing events. Can you explain a bit more why we delete that part?
Also, the switch itself is a bit strange to me, especially the second case. If you are disconnected, and you are connecting, why should it go disconnecting?

Copy link
Member Author

@nuno-vieira nuno-vieira Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change the processing events part. The github diffing is not very clear, but the only thing changed here is the report of connectionState

Also, the switch itself is a bit strange to me, especially the second case. If you are disconnected, and you are connecting, why should it go disconnecting?

Where does this happen? We report disconnected if we were not connected previously, so theres nothing to disconnect, that is why we instantly report that we are disconnected. If we are connecting, or waiting for connection or connected, then we need to call disconnect to the engine, so the engine will eventually report the disconnected state.

Copy link
Member Author

@nuno-vieira nuno-vieira Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is quite clear actually:

When the state is initialized, .disconnected, and disconnecting, and the user disconnects, we instantly report disconnected since the WebSocket engine won't report anything because it is already disconnected.

When the state is connected, connecting or waiting for connection, we need to call the engine.disconnect() and the websocket delegate will report the disconnected state. This is why here we report disconnecting instead of disconnected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, now after seeing the diagram and the explanations, it makes more sense, thanks.

}
}

func timeout() {
let previousState = connectionState
connectionState = .disconnected(source: .timeout(from: previousState))

engineQueue.async { [engine, eventsBatcher] in
engine?.disconnect()

eventsBatcher.processImmediately {}
eventsBatcher.processImmediately(completion: completion)
}
log.error("Connection timed out. `\(connectionState)", subsystems: .webSocket)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
private var reconnectionStrategy: RetryStrategy
private var reconnectionTimer: TimerControl?
private let keepConnectionAliveInBackground: Bool
private var reconnectionTimeoutHandler: StreamTimer?

// MARK: - Init

Expand All @@ -49,8 +48,7 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
internetConnection: InternetConnection,
reconnectionStrategy: RetryStrategy,
reconnectionTimerType: Timer.Type,
keepConnectionAliveInBackground: Bool,
reconnectionTimeoutHandler: StreamTimer?
keepConnectionAliveInBackground: Bool
) {
self.webSocketClient = webSocketClient
self.eventNotificationCenter = eventNotificationCenter
Expand All @@ -61,7 +59,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
self.reconnectionStrategy = reconnectionStrategy
self.reconnectionTimerType = reconnectionTimerType
self.keepConnectionAliveInBackground = keepConnectionAliveInBackground
self.reconnectionTimeoutHandler = reconnectionTimeoutHandler
}

func start() {
Expand All @@ -71,7 +68,6 @@ final class DefaultConnectionRecoveryHandler: ConnectionRecoveryHandler {
func stop() {
unsubscribeFromNotifications()
cancelReconnectionTimer()
reconnectionTimeoutHandler?.stop()
}

deinit {
Expand All @@ -94,11 +90,6 @@ private extension DefaultConnectionRecoveryHandler {
name: .internetConnectionAvailabilityDidChange,
object: nil
)

reconnectionTimeoutHandler?.onChange = { [weak self] in
self?.webSocketClient.timeout()
self?.cancelReconnectionTimer()
}
}

func unsubscribeFromNotifications() {
Expand Down Expand Up @@ -177,17 +168,13 @@ extension DefaultConnectionRecoveryHandler {
switch state {
case .connecting:
cancelReconnectionTimer()
if reconnectionTimeoutHandler?.isRunning == false {
reconnectionTimeoutHandler?.start()
}

case .connected:
extensionLifecycle.setAppState(isReceivingEvents: true)
reconnectionStrategy.resetConsecutiveFailures()
syncRepository.syncLocalState {
log.info("Local state sync completed", subsystems: .offlineSupport)
}
reconnectionTimeoutHandler?.stop()

case .disconnected:
extensionLifecycle.setAppState(isReceivingEvents: false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Foundation
/// Mock implementation of `ChatClientUpdater`
final class ConnectionRepository_Mock: ConnectionRepository, Spy {
enum Signature {
static let initialize = "initialize()"
static let connect = "connect(completion:)"
static let disconnect = "disconnect(source:completion:)"
static let forceConnectionInactiveMode = "forceConnectionStatusForInactiveModeIfNeeded()"
Expand Down Expand Up @@ -58,6 +59,10 @@ final class ConnectionRepository_Mock: ConnectionRepository, Spy {

// MARK: - Overrides

override func initialize() {
record()
}

override func connect(completion: ((Error?) -> Void)? = nil) {
record()
if let result = connectResult {
Expand Down
Loading
Loading