Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.4.2 #72

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/main/Hangfire.Storage.SQLite/ExpirationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ private int RemoveExpireRows(HangfireDbContext db, string table)

try
{
var _lock = new SQLiteDistributedLock(DistributedLockKey, DefaultLockTimeout,
db, db.StorageOptions);

using (_lock)
using (SQLiteDistributedLock.Acquire(DistributedLockKey, DefaultLockTimeout,
db, db.StorageOptions))
{
rowsAffected = db.Database.Execute(deleteScript);
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TargetFrameworks>netstandard2.0;net48</TargetFrameworks>
</PropertyGroup>
<PropertyGroup>
<Version>0.4.1</Version>
<Version>0.4.2</Version>
<Authors>RaisedApp</Authors>
<Company>RaisedApp</Company>
<Copyright>Copyright © 2019 - Present</Copyright>
Expand All @@ -20,8 +20,12 @@
<title>Hangfire Storage SQLite</title>
<Description>An Alternative SQLite Storage for Hangfire</Description>
<PackageReleaseNotes>
0.4.1
- Stability and retry enhancements introduced by: Daniel Lindblom
0.4.2
-remove re-entrancy (fixes SQLiteDistributedLock doesn't play right with async #68). Thanks to @kirides
-pause heartbeat timer while processing. Thanks to @kirides
-update expiration using SQL Update statement in a single step. Thanks to @kirides
-Added Heartbeat event (for testing). Thanks to @kirides
-if we no longer own the lock, we immediately dispose the heartbeat timer (fixes Unable to update heartbeat - still happening in .NET 6.0 #69). Thanks to @kirides
</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public override void Dispose()
public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout)
{
return Retry.Twice((_) =>
new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions)
SQLiteDistributedLock.Acquire($"HangFire:{resource}", timeout, DbContext, _storageOptions)
);
}

Expand Down
188 changes: 85 additions & 103 deletions src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hangfire.Storage.SQLite.Entities;
using SQLite;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Hangfire.Storage.SQLite
Expand All @@ -14,9 +14,6 @@ public class SQLiteDistributedLock : IDisposable
{
private static readonly ILog Logger = LogProvider.For<SQLiteDistributedLock>();

private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks
= new ThreadLocal<Dictionary<string, int>>(() => new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase));

private readonly string _resource;
private readonly string _resourceKey;

Expand All @@ -30,15 +27,17 @@ private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks

private string EventWaitHandleName => string.Intern($@"{GetType().FullName}.{_resource}");

public event Action<bool> Heartbeat;

/// <summary>
/// Creates SQLite distributed lock
/// </summary>
/// <param name="resource">Lock resource</param>
/// <param name="timeout">Lock timeout</param>
/// <param name="database">Lock database</param>
/// <param name="storageOptions">Database options</param>
/// <exception cref="DistributedLockTimeoutException">Thrown if lock is not acuired within the timeout</exception>
public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContext database,
private SQLiteDistributedLock(string resource,
HangfireDbContext database,
SQLiteStorageOptions storageOptions)
{
_resource = resource ?? throw new ArgumentNullException(nameof(resource));
Expand All @@ -50,22 +49,25 @@ public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContex
{
throw new ArgumentException($@"The {nameof(resource)} cannot be empty", nameof(resource));
}
}

public static SQLiteDistributedLock Acquire(
string resource,
TimeSpan timeout,
HangfireDbContext database,
SQLiteStorageOptions storageOptions)
{
if (timeout.TotalSeconds > int.MaxValue)
{
throw new ArgumentException($"The timeout specified is too large. Please supply a timeout equal to or less than {int.MaxValue} seconds", nameof(timeout));
}

if (!AcquiredLocks.Value.ContainsKey(_resource) || AcquiredLocks.Value[_resource] == 0)
{
Cleanup();
Acquire(timeout);
AcquiredLocks.Value[_resource] = 1;
StartHeartBeat();
}
else
{
AcquiredLocks.Value[_resource]++;
}
var slock = new SQLiteDistributedLock(resource, database, storageOptions);

slock.Acquire(timeout);
slock.StartHeartBeat();

return slock;
}

/// <summary>
Expand All @@ -78,96 +80,52 @@ public void Dispose()
{
return;
}

_completed = true;
_heartbeatTimer?.Dispose();
Release();
}

if (!AcquiredLocks.Value.ContainsKey(_resource))
private bool TryAcquireLock()
{
Cleanup();
try
{
return;
}

AcquiredLocks.Value[_resource]--;
var distributedLock = new DistributedLock
{
Id = Guid.NewGuid().ToString(),
Resource = _resource,
ResourceKey = _resourceKey,
ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)
};

if (AcquiredLocks.Value[_resource] > 0)
{
return;
return _dbContext.Database.Insert(distributedLock) == 1;
}

// Timer callback may be invoked after the Dispose method call,
// but since we use the resource key, we will not disturb other owners.
AcquiredLocks.Value.Remove(_resource);

if (_heartbeatTimer != null)
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
return false;
}

Release();

Cleanup();
}

private void Acquire(TimeSpan timeout)
{
try
var sw = Stopwatch.StartNew();
do
{
var isLockAcquired = false;
var now = DateTime.UtcNow;
var lockTimeoutTime = now.Add(timeout);

while (lockTimeoutTime >= now)
if (TryAcquireLock())
{
Cleanup();

lock (EventWaitHandleName)
{
var result = _dbContext.DistributedLockRepository.FirstOrDefault(_ => _.Resource == _resource);

if (result == null)
{
try
{
var distributedLock = new DistributedLock();
distributedLock.Id = Guid.NewGuid().ToString();
distributedLock.Resource = _resource;
distributedLock.ResourceKey = _resourceKey;
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);

_dbContext.Database.Insert(distributedLock);

// we were able to acquire the lock - break the loop
isLockAcquired = true;
break;
}
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
{
// The lock already exists preventing us from inserting.
continue;
}
}
}

// we couldn't acquire the lock - wait a bit and try again
var waitTime = (int)timeout.TotalMilliseconds / 10;
lock (EventWaitHandleName)
Monitor.Wait(EventWaitHandleName, waitTime);

now = DateTime.UtcNow;
return;
}

if (!isLockAcquired)
var waitTime = (int) timeout.TotalMilliseconds / 10;
// either wait for the event to be raised, or timeout
lock (EventWaitHandleName)
{
throw new DistributedLockTimeoutException(_resource);
Monitor.Wait(EventWaitHandleName, waitTime);
}
}
catch (DistributedLockTimeoutException ex)
{
throw ex;
}
catch (Exception ex)
{
throw ex;
}
} while (sw.Elapsed <= timeout);

throw new DistributedLockTimeoutException(_resource);
}

/// <summary>
Expand All @@ -179,9 +137,12 @@ private void Release()
Retry.Twice((retry) => {

// Remove resource lock (if it's still ours)
_dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
lock (EventWaitHandleName)
Monitor.Pulse(EventWaitHandleName);
var count = _dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
if (count != 0)
{
lock (EventWaitHandleName)
Monitor.Pulse(EventWaitHandleName);
}
});
}

Expand All @@ -192,7 +153,7 @@ private void Cleanup()
Retry.Twice((_) => {
// Delete expired locks (of any owner)
_dbContext.DistributedLockRepository.
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
});
}
catch (Exception ex)
Expand All @@ -210,27 +171,48 @@ private void StartHeartBeat()

_heartbeatTimer = new Timer(state =>
{
// stop timer
_heartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite);
// Timer callback may be invoked after the Dispose method call,
// but since we use the resource key, we will not disturb other owners.
try
{
var distributedLock = _dbContext.DistributedLockRepository.FirstOrDefault(x => x.Resource == _resource && x.ResourceKey == _resourceKey);
if (distributedLock != null)
{
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);

_dbContext.Database.Update(distributedLock);
}
else
var didUpdate = UpdateExpiration(_dbContext.DistributedLockRepository, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime));
Heartbeat?.Invoke(didUpdate);
if (!didUpdate)
{
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource);

// if we no longer have a lock, stop the heartbeat immediately
_heartbeatTimer?.Dispose();
return;
}
}
catch (Exception ex)
{
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. {1}", _resource, ex);
}
// restart timer
_heartbeatTimer?.Change(timerInterval, timerInterval);
}, null, timerInterval, timerInterval);
}

private bool UpdateExpiration(TableQuery<DistributedLock> tableQuery, DateTime expireAt)
{
var expireColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ExpireAt)).Name;
var resourceColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.Resource)).Name;
var resourceKeyColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ResourceKey)).Name;
var table = tableQuery.Table.TableName;

var command = tableQuery.Connection.CreateCommand($@"UPDATE ""{table}""
SET ""{expireColumn}"" = ?
WHERE ""{resourceColumn}"" = ?
AND ""{resourceKeyColumn}"" = ?",
expireAt,
_resource,
_resourceKey);

return command.ExecuteNonQuery() != 0;
}
}
}
}
27 changes: 27 additions & 0 deletions src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ public class SQLiteStorage : JobStorage, IDisposable
private readonly SQLiteDbConnectionFactory _dbConnectionFactory;

private readonly SQLiteStorageOptions _storageOptions;

private readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{ "Storage.ExtendedApi", false },
{ "Job.Queue", true },
{ "Connection.GetUtcDateTime", false },
{ "Connection.BatchedGetFirstByLowestScoreFromSet", false },
{ "Connection.GetSetContains", true },
{ "Connection.GetSetCount.Limited", false },
{ "BatchedGetFirstByLowestScoreFromSet", false },
{ "Transaction.AcquireDistributedLock", true },
{ "Transaction.CreateJob", true },
{ "Transaction.SetJobParameter", true },
{ "TransactionalAcknowledge:InMemoryFetchedJob", false },
{ "Monitoring.DeletedStateGraphs", false },
{ "Monitoring.AwaitingJobs", false }
};

private ConcurrentQueue<PooledHangfireDbContext> _dbContextPool = new ConcurrentQueue<PooledHangfireDbContext>();

/// <summary>
Expand Down Expand Up @@ -113,6 +131,15 @@ private void EnqueueOrPhaseOut(PooledHangfireDbContext dbContext)
}
}

public override bool HasFeature(string featureId)
{
if (featureId == null) throw new ArgumentNullException(nameof(featureId));

return _features.TryGetValue(featureId, out var isSupported)
? isSupported
: base.HasFeature(featureId);
}

/// <summary>
/// Returns text representation of the object
/// </summary>
Expand Down
13 changes: 12 additions & 1 deletion src/samples/WebSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
.UseSQLiteStorage("Hangfire.db")
.UseHeartbeatPage(checkInterval: TimeSpan.FromSeconds(10))
.UseJobsLogger());
services.AddHangfireServer();
services.AddHangfireServer(options =>
{
options.Queues = new[] { "test_queue_1", "default" };
});

var app = builder.Build();

Expand All @@ -27,4 +30,12 @@
RecurringJob.AddOrUpdate("TaskMethod()", (TaskSample t) => t.TaskMethod(), Cron.Minutely);
RecurringJob.AddOrUpdate("TaskMethod2()", (TaskSample t) => t.TaskMethod2(null), Cron.Minutely);

var t = app.Services.GetService<IBackgroundJobClient>();
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));

app.Run();
Loading
Loading