Skip to content

Commit

Permalink
Better websocket connection semantics. Re #18.
Browse files Browse the repository at this point in the history
  • Loading branch information
bchavez committed Aug 30, 2020
1 parent 5db11a2 commit 8b01a76
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 10 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## v3.2.0
* Added `ConnectResult` return type from `CoinbaseProWebSocket.ConnectAsync()` for better semantic connection handling.

## v3.0.5
* Fixed `Withdrawals.GetWithdrawal()`. Previously used wrong URL path.

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ Be sure to [check the documentation here](https://docs.pro.coinbase.com/?r=1#sub
var socket = ...; //using authenticated or unauthenticated instance
//Connect the websocket,
//when this connect method completes, the socket is ready
await socket.ConnectAsync();
//when this connect method completes, the socket is ready or failure occured.
var result = await socket.ConnectAsync();
if( !result.Success ) throw new Exception("Failed to connect.");

//add an event handler for the message received event on the raw socket
socket.RawSocket.MessageReceived += RawSocket_MessageReceived;
Expand Down
38 changes: 33 additions & 5 deletions Source/Coinbase.Pro/WebSockets/CoinbaseProWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ public void EnsureValid()
}
}

public class ConnectResult
{
public ConnectResult(bool success, object sender, EventArgs eventArgs)
{
this.Success = success;
this.Sender = sender;
this.EventArgs = eventArgs;
}

public bool Success { get; }
public object Sender { get; }
public EventArgs EventArgs { get; }
}

public class CoinbaseProWebSocket : IDisposable
{
public const string Endpoint = "wss://ws-feed.pro.coinbase.com";
Expand All @@ -38,22 +52,22 @@ public CoinbaseProWebSocket(WebSocketConfig config = null)

public WebSocketConfig Config { get; }

protected TaskCompletionSource<bool> connectingTcs;
protected TaskCompletionSource<ConnectResult> connectingTcs;

protected IProxyConnector Proxy { get; set; }

/// <summary>
/// Connect the websocket to Coinbase Pro.
/// </summary>
/// <returns></returns>
public Task ConnectAsync()
public Task<ConnectResult> ConnectAsync()
{
if( this.RawSocket != null ) throw new InvalidOperationException(
$"The {nameof(RawSocket)} is already created from a previous {nameof(ConnectAsync)} call. " +
$"If you get this exception, you'll need to dispose of this {nameof(CoinbaseProWebSocket)} and create a new instance. " +
$"Don't call {nameof(ConnectAsync)} multiple times on the same instance.");

this.connectingTcs = new TaskCompletionSource<bool>();
this.connectingTcs = new TaskCompletionSource<ConnectResult>();

if( this.RawSocket is null )
{
Expand All @@ -63,19 +77,33 @@ public Task ConnectAsync()
}

this.RawSocket.Opened += RawSocket_Opened;
this.RawSocket.Error += RawSocket_Error;
this.RawSocket.Open();

return this.connectingTcs.Task;
}

private void RawSocket_Error(object sender, ErrorEventArgs e)
{
TrySetConnectResult(false, sender, e);
}

private void RawSocket_Opened(object sender, EventArgs e)
{
TrySetConnectResult(true, sender, e);
}

protected void TrySetConnectResult(bool result, object sender, EventArgs args)
{
var connectResult = new ConnectResult(result, sender, args);

if( sender is WebSocket socket )
{
socket.Opened -= RawSocket_Opened;
socket.Error -= RawSocket_Error;
}
Task.Run(() => this.connectingTcs.SetResult(true));

Task.Run(() => this.connectingTcs.TrySetResult(connectResult));
}

public void EnableFiddlerDebugProxy(IProxyConnector proxy)
Expand Down
12 changes: 10 additions & 2 deletions Source/Coinbase.Tests/IntegrationTests/WebSocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ public void BeforeEachTest()
[Test]
public async Task connect()
{
await socket.ConnectAsync();
var result = await socket.ConnectAsync();
if( !result.Success )
{
throw new Exception("Connection error.");
}

//https://docs.pro.coinbase.com/?r=1#protocol-overview
// Request
Expand Down Expand Up @@ -80,7 +84,11 @@ private void RawSocket_MessageReceived(object sender, WebSocket4Net.MessageRecei
[Test]
public async Task connect_simple()
{
await socket.ConnectAsync();
var result = await socket.ConnectAsync();
if( !result.Success )
{
throw new Exception("Connect error.");
}

var sub = new Subscription
{
Expand Down
4 changes: 3 additions & 1 deletion Source/Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static async Task SubscribeToWebsocketEvents(Credentials creds)
//socket.EnableFiddlerDebugProxy(new HttpConnectProxy(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8888)));
#endif

await socket.ConnectAsync();
var result = await socket.ConnectAsync();
if( !result.Success )
throw new Exception("Connect error.");

WriteLine(">> Connected.");

Expand Down
167 changes: 167 additions & 0 deletions Source/Examples/ResilientWebSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Coinbase.Pro.Models;
using Coinbase.Pro.WebSockets;
using SuperSocket.ClientEngine;
using SuperSocket.ClientEngine.Proxy;
using WebSocket4Net;
using static System.Console;

namespace Examples
{
public class ResilientWebSocket
{
private readonly Credentials credentials;
private CoinbaseProWebSocket coinbase;
private Subscription subscription;
private SemaphoreSlim locker = new SemaphoreSlim(1, 1);
private CancellationTokenSource cts;

public ResilientWebSocket(Credentials credentials)
{
this.credentials = credentials;
}

public Task Start(Subscription subscription)
{
this.subscription = subscription;
this.cts = new CancellationTokenSource();
return Task.Run(() => SafeReconnect());
}

public async Task Stop()
{
this.cts?.Cancel();

WriteLine("Waiting 80 sec for shutdown...");
this.locker.Wait(80_000);

WriteLine("Shutdown complete.");
WriteLine("!! Websocket is closed! ResilientWebSocket stopped.");
}

private async Task Reconnect(Credentials creds, Subscription subscription)
{
if (this.cts.IsCancellationRequested) return;

this.coinbase = new CoinbaseProWebSocket(new WebSocketConfig
{
ApiKey = creds.ApiKey,
Secret = creds.ApiSecret,
Passphrase = creds.ApiPassphrase,
SocketUri = "wss://ws-feed-public.sandbox.pro.coinbase.com"
});

WriteLine(">> Connecting websocket...");

//Uncomment depending on your TFM if you want to debug the websocket
//connection to Coinbase Pro with Fiddler
#if !NETFRAMEWORK
coinbase.EnableFiddlerDebugProxy(new HttpConnectProxy(IPEndPoint.Parse("127.0.0.1:8888")));
#else
coinbase.EnableFiddlerDebugProxy(new HttpConnectProxy(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8888)));
#endif

var result = await coinbase.ConnectAsync();
if( !result.Success )
{
var ex = new Exception("Connect failed.")
{
Data = {{"ConnectResult", result}}
};
throw ex;
}

WriteLine(">> Connected.");

coinbase.RawSocket.Closed += Websocket_Closed;
coinbase.RawSocket.Error += Websocket_Error;
coinbase.RawSocket.MessageReceived += Websocket_MessageReceived;

WriteLine(">> Subscribing to events...");
var sub = new Subscription
{
Channels = subscription.Channels,
ProductIds = subscription.ProductIds
};
await coinbase.SubscribeAsync(sub);

WriteLine(">> Subscribed.");
}

private void Websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
if (this.cts.IsCancellationRequested) return;

WriteLine("Message received.");
if (WebSocketHelper.TryParse(e.Message, out var msg))
{
if (msg is HeartbeatEvent hb)
{
WriteLine($"Sequence: {hb.Sequence}, Last Trade Id: {hb.LastTradeId}");
}
}
}

private void Websocket_Error(object sender, ErrorEventArgs e)
{
if (this.cts.IsCancellationRequested) return;

WriteLine("!! Websocket Error! ");
WriteLine(e);
}

private void Websocket_Closed(object sender, EventArgs e)
{
if (this.cts.IsCancellationRequested) return;

WriteLine("!! The websocket closed!");
WriteLine("!! Reconnecting...");
Task.Run(() => SafeReconnect());
}

private async Task SafeReconnect()
{
if( this.cts.IsCancellationRequested ) return;

if (!locker.Wait(0)) return; //any threads that can't acquire the lock, go away

while ( !this.cts.IsCancellationRequested )
{
try
{
SafeShutdown();
await Reconnect(this.credentials, this.subscription);
break;
}
catch(Exception e)
{
WriteLine(e);
}
}

locker.Release();
}

private void SafeShutdown()
{
if (this.coinbase?.RawSocket is null) return;
if (this.subscription is null) return;

this.coinbase.RawSocket.Closed -= Websocket_Closed;
this.coinbase.RawSocket.Error -= Websocket_Error;
this.coinbase.RawSocket.MessageReceived -= Websocket_MessageReceived;

if (this.coinbase.RawSocket.State == WebSocketState.Open)
{
this.coinbase.Unsubscribe(this.subscription);
WriteLine("!! Closing Websocket...");
this.coinbase.RawSocket.Close();
}

this.coinbase.Dispose();
}
}
}

0 comments on commit 8b01a76

Please sign in to comment.