Skip to content

Commit

Permalink
- #73 - implement a few of the missing features
Browse files Browse the repository at this point in the history
Signed-off-by: kirides <[email protected]>
  • Loading branch information
kirides committed Apr 19, 2024
1 parent 1850239 commit 8dc26d7
Show file tree
Hide file tree
Showing 10 changed files with 524 additions and 142 deletions.
3 changes: 3 additions & 0 deletions src/main/Hangfire.Storage.SQLite/Assembly.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Hangfire.Storage.SQLite.Test")]
55 changes: 54 additions & 1 deletion src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,28 @@ public override long GetSetCount(string key)
.SetRepository
.Count(_ => _.Key == key);
}

public override long GetSetCount(IEnumerable<string> keys, int limit)
{
if (keys == null)
{
throw new ArgumentNullException(nameof(keys));
}

var count = DbContext
.SetRepository
.Where(_ => keys.Contains(_.Key))
.Take(limit)
.Count();
return Math.Min(count, limit);
}

public override bool GetSetContains(string key, string value)
{
return DbContext
.SetRepository
.Any(x => x.Key == key && x.Value == value);
}

public override string GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore)
{
Expand All @@ -257,6 +279,32 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore
.FirstOrDefault();
}

public override List<string> GetFirstByLowestScoreFromSet(string key, double fromScore, double toScore, int count)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}

if (toScore < fromScore)
{
throw new ArgumentException("The 'toScore' value must be higher or equal to the 'fromScore' value.");
}

var fromScoreDec = fromScore.ToInt64();
var toScoreDec = toScore.ToInt64();

return DbContext
.SetRepository
.Where(_ => _.Key == key &&
_.Score >= fromScoreDec &&
_.Score <= toScoreDec)
.OrderBy(_ => _.Score)
.Select(_ => _.Value)
.Take(count)
.ToList();
}

public override JobData GetJobData(string jobId)
{
if (jobId == null)
Expand Down Expand Up @@ -434,7 +482,7 @@ public override void SetJobParameter(string id, string name, string value)

public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
{
using (var transaction = new SQLiteWriteOnlyTransaction(DbContext, _queueProviders))
using (var transaction = CreateWriteTransaction())
{
transaction.SetRangeInHash(key, keyValuePairs);
transaction.Commit();
Expand Down Expand Up @@ -561,6 +609,11 @@ public override TimeSpan GetListTtl(string key)
return result != DateTime.MinValue ? result - DateTime.UtcNow : TimeSpan.FromSeconds(-1);
}

public override DateTime GetUtcDateTime()
{
return DateTime.UtcNow;
}

public override List<string> GetRangeFromList(string key, int startingFrom, int endingAt)
{
if (key == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void Add(IPersistentJobQueueProvider provider, IEnumerable<string> queues
/// <returns></returns>
public IPersistentJobQueueProvider GetProvider(string queue)
{
return _providersByQueue.ContainsKey(queue)
? _providersByQueue[queue]
return _providersByQueue.TryGetValue(queue, out var value)
? value
: _defaultProvider;
}

Expand Down
10 changes: 9 additions & 1 deletion src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void Dispose()

_completed = true;
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
Release();
}

Expand Down Expand Up @@ -184,7 +185,14 @@ private void StartHeartBeat()
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource);

// if we no longer have a lock, stop the heartbeat immediately
_heartbeatTimer?.Dispose();
try
{
_heartbeatTimer?.Dispose();
}
catch (ObjectDisposedException)
{
// well, already disposed?
}
return;
}
}
Expand Down
28 changes: 14 additions & 14 deletions src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ public class SQLiteStorage : JobStorage, IDisposable

private readonly SQLiteStorageOptions _storageOptions;

private readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
private static readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{ "Storage.ExtendedApi", false },
{ "Job.Queue", true },
{ "Connection.GetUtcDateTime", false },
{ "Connection.BatchedGetFirstByLowestScoreFromSet", false },
{ "Connection.GetSetContains", true },
{ "Connection.GetSetCount.Limited", false },
{ "BatchedGetFirstByLowestScoreFromSet", false },
{ "Transaction.AcquireDistributedLock", true },
{ "Transaction.CreateJob", true },
{ "Transaction.SetJobParameter", true },
{ "TransactionalAcknowledge:InMemoryFetchedJob", false },
{ "Monitoring.DeletedStateGraphs", false },
{ "Monitoring.AwaitingJobs", false }
{ JobStorageFeatures.ExtendedApi, true },
{ JobStorageFeatures.JobQueueProperty, true },
{ JobStorageFeatures.Connection.GetUtcDateTime, true },
{ JobStorageFeatures.Connection.BatchedGetFirstByLowest, true },
{ "BatchedGetFirstByLowestScoreFromSet", true }, // ^-- legacy name?
{ JobStorageFeatures.Connection.GetSetContains, true },
{ JobStorageFeatures.Connection.LimitedGetSetCount, true },
{ JobStorageFeatures.Transaction.AcquireDistributedLock, true },
{ JobStorageFeatures.Transaction.CreateJob, false }, // NOTE: implement SQLiteWriteOnlyTransaction.CreateJob(...)
{ JobStorageFeatures.Transaction.SetJobParameter, false }, // NOTE: implement SQLiteWriteOnlyTransaction.SetJobParameter(...)
{ JobStorageFeatures.Transaction.RemoveFromQueue(typeof(SQLiteFetchedJob)), false }, // NOTE: implement SQLiteWriteOnlyTransaction.RemoveFromQueue(...)
{ JobStorageFeatures.Monitoring.DeletedStateGraphs, false },
{ JobStorageFeatures.Monitoring.AwaitingJobs, false }
};

private ConcurrentQueue<PooledHangfireDbContext> _dbContextPool = new ConcurrentQueue<PooledHangfireDbContext>();
Expand Down
56 changes: 47 additions & 9 deletions src/main/Hangfire.Storage.SQLite/SQLiteWriteOnlyTransaction.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using Hangfire.Logging;
using Hangfire.States;
using Hangfire.Storage.SQLite.Entities;
using Newtonsoft.Json;
Expand All @@ -17,8 +16,10 @@ public class SQLiteWriteOnlyTransaction : JobStorageTransaction

private readonly PersistentJobQueueProviderCollection _queueProviders;

private static object _lockObject = new object();

internal readonly List<IDisposable> _acquiredLocks = new List<IDisposable>();

private static readonly ILog Logger = LogProvider.For<SQLiteWriteOnlyTransaction>();

/// <summary>
/// </summary>
/// <param name="connection"></param>
Expand All @@ -36,6 +37,15 @@ private void QueueCommand(Action<HangfireDbContext> action)
_commandQueue.Enqueue(action);
}

public override void AcquireDistributedLock(string resource, TimeSpan timeout)
{
var acquiredLock = SQLiteDistributedLock.Acquire(resource, timeout, _dbContext, _dbContext.StorageOptions);
lock (_acquiredLocks)
{
_acquiredLocks.Add(acquiredLock);
}
}

public override void AddJobState(string jobId, IState state)
{
QueueCommand(_ =>
Expand Down Expand Up @@ -113,16 +123,44 @@ public override void AddToSet(string key, string value, double score)

public override void Commit()
{
Retry.Twice((attempts) => {

lock (_lockObject)
{
try
{
Retry.Twice((attempts) => {
_commandQueue.ToList().ForEach(_ =>
{
_.Invoke(_dbContext);
});
});
}
finally
{
ReleaseAcquiredLocks();
}
}

private void ReleaseAcquiredLocks()
{
lock (_acquiredLocks)
{
foreach (var acquiredLock in _acquiredLocks)
{
try
{
acquiredLock.Dispose();
}
catch (Exception ex)
{
Logger.WarnException("Failed to release a distributed lock", ex);
}
}
});
_acquiredLocks.Clear();
}
}

public override void Dispose()
{
ReleaseAcquiredLocks();
base.Dispose();
}

/// <summary>
Expand Down
Loading

0 comments on commit 8dc26d7

Please sign in to comment.