Skip to content

Commit

Permalink
History updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rossnichols committed Jun 13, 2021
1 parent fa747fc commit ce87c92
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 304 deletions.
196 changes: 106 additions & 90 deletions ABGP/Libs/LibLedger/LibLedger.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- controller:
-- GetLedger(): table (entries), number (baseline)
-- GetEntryInfo(entry): id, entryDate
-- GetEntryDate(id): number
-- GetVersion(): any
-- IsSelf(string): boolean
-- GetTime(): number
Expand Down Expand Up @@ -29,7 +29,6 @@ else
end

local math = math;
local ipairs = ipairs;
local pairs = pairs;
local bit = bit;
local next = next;
Expand All @@ -54,7 +53,7 @@ end

function LibLedger:SyncNewEntries(controller, entries)
local privileged = controller:CanWriteEntries();
local ledger, baseline = controller:GetLedger();
local _, baseline = controller:GetLedger();
local invalidLedger = (baseline == _invalidBaseline);
local canSendEntries = privileged and not invalidLedger;

Expand Down Expand Up @@ -83,23 +82,23 @@ function LibLedger:HandleComm(controller, data, sender)
end
end

function LibLedger:_SendComm(controller, data, target)
controller:Log("Comm: %s to %s", data.name, target or "everyone");
controller:SendComm(data, target);
end

function LibLedger:HasValidLedger(controller)
local ledger, baseline = controller:GetLedger();
local _, baseline = controller:GetLedger();
return (baseline ~= _invalidBaseline);
end

function LibLedger:SetInvalidLedger(controller)
controller:SetBaseline(_invalidBaseline);
end

function LibLedger:_SendComm(controller, data, target)
controller:Log("Comm: %s to %s", data.name, target or "everyone");
controller:SendComm(data, target);
end

function LibLedger:_SyncWorker(controller, token, now)
local privileged = controller:CanWriteEntries();
local ledger, baseline = controller:GetLedger();
local _, baseline = controller:GetLedger();
local invalidLedger = (baseline == _invalidBaseline);
local canSendEntries = privileged and not invalidLedger;

Expand All @@ -111,13 +110,15 @@ function LibLedger:_SyncWorker(controller, token, now)
now = now,
version = controller:GetVersion(),

hashes = {},
ids = canSendEntries and {} or nil,
handled = {},
};

if not invalidLedger then
self:_BuildSyncHashData(controller, commData, now);
local hashBuckets, idBuckets = self:_BuildSyncBuckets(controller, now);
commData.hashes = hashBuckets;
if canSendEntries then
commData.ids = idBuckets;
end
end

if commData.ids then
Expand All @@ -133,34 +134,42 @@ function LibLedger:_SyncWorker(controller, token, now)
self:_SendComm(controller, commData);
end

function LibLedger:_BuildSyncHashData(controller, ledgerData, now, addAllIds)
function LibLedger:_BuildSyncBuckets(controller, now)
local ledger = controller:GetLedger();
local firstThreshold, otherThresholds = controller:GetSyncThresholds();

local hashBuckets = {};
local idBuckets = {};

local bucket = 1;
ledgerData.hashes[bucket] = 0;
if ledgerData.ids then
ledgerData.ids[bucket] = {};
end
local bucketCount = 0;
hashBuckets[bucket] = 0;
idBuckets[bucket] = {};
local threshold = firstThreshold;

for _, entry in ipairs(ledger) do
local id, entryDate = controller:GetEntryInfo(entry);
for i = #ledger.ids, 1, -1 do
local id = ledger.ids[i];
local entryDate = controller:GetEntryDate(id);
while now - entryDate > threshold do
controller:Log("%d ids in bucket %d", bucketCount, bucket);
bucketCount = 0;
bucket = bucket + 1;
ledgerData.hashes[bucket] = 0;
if ledgerData.ids and addAllIds then
ledgerData.ids[bucket] = {};
end
hashBuckets[bucket] = 0;
idBuckets[bucket] = {};
threshold = threshold + otherThresholds;
end
-- controller:Log("Id %s at time %d is in bucket %d", id, entryDate, bucket);

ledgerData.hashes[bucket] = bit.bxor(ledgerData.hashes[bucket], Hash(id));
if ledgerData.ids and ledgerData.ids[bucket] then
ledgerData.ids[bucket][id] = true;
end
bucketCount = bucketCount + 1;
hashBuckets[bucket] = bit.bxor(hashBuckets[bucket], Hash(id));
idBuckets[bucket][id] = true;
end

if hashBuckets[bucket] ~= 0 then
controller:Log("%d ids in bucket %d", bucketCount, bucket);
end

return hashBuckets, idBuckets;
end

function LibLedger:_GetWorkingTable(controller)
Expand Down Expand Up @@ -215,28 +224,24 @@ function LibLedger:_LEDGER_SYNC(controller, data, sender)
return;
end

local ledgerData = {
ids = canSendEntries and {} or nil,
hashes = {},
};
self:_BuildSyncHashData(controller, ledgerData, data.now, true);

if not data.ids and canSendEntries then
-- The sender sent hashes of their entries. If any of our hashes are different,
-- we'll send them ids and they can request whatever they want.
for bucket, hash in pairs(ledgerData.hashes) do
if hash == (data.hashes[bucket] or 0) then
ledgerData.ids[bucket] = nil;
local hashBuckets, idBuckets = self:_BuildSyncBuckets(controller, data.now);

for bucket, hash in pairs(hashBuckets) do
if hash == (hashBuckets[bucket] or 0) then
idBuckets[bucket] = nil;
else
controller:Log("Bucket %d from %s has a mismatched hash (no ids).", bucket, sender);
end
end

if next(ledgerData.ids) then
if next(idBuckets) then
local bucketCount = 0;
for bucket, ids in pairs(ledgerData.ids) do
for bucket, ids in pairs(idBuckets) do
bucketCount = bucketCount + 1;
ledgerData.ids[bucket] = controller:PrepareIds(ids, data.now);
idBuckets[bucket] = controller:PrepareIds(ids, data.now);
end
controller:Log("Sending sync (with ids) for %d buckets to %s", bucketCount, sender);
self:_SendComm(controller, {
Expand All @@ -247,14 +252,15 @@ function LibLedger:_LEDGER_SYNC(controller, data, sender)
now = data.now,
version = data.version,

hashes = ledgerData.hashes,
ids = ledgerData.ids,
hashes = hashBuckets,
ids = idBuckets,
handled = {},
}, sender);
end
elseif data.ids and senderIsPrivileged then
-- The sender sent ids. First check hashes to see if we need to
-- sync any more buckets.
-- The sender sent ids. First check hashes to see if we need to sync any more buckets.
local hashBuckets, idBuckets = self:_BuildSyncBuckets(controller, data.now);

local needsSync = false;
local bucketCount = 0;
local commData = {
Expand All @@ -269,7 +275,7 @@ function LibLedger:_LEDGER_SYNC(controller, data, sender)
ids = canSendEntries and {} or nil,
handled = {},
};
for bucket, hash in pairs(ledgerData.hashes) do
for bucket, hash in pairs(hashBuckets) do
commData.hashes[bucket] = hash;
if hash == (data.hashes[bucket] or 0) then
-- We have the same hash as the sender.
Expand All @@ -284,8 +290,8 @@ function LibLedger:_LEDGER_SYNC(controller, data, sender)
controller:Log("Bucket %d from %s has a mismatched hash (ids)", bucket, sender);
needsSync = true;
bucketCount = bucketCount + 1;
if canSendEntries then
commData.ids[bucket] = ledgerData.ids[bucket];
if commData.ids then
commData.ids[bucket] = idBuckets[bucket];
end
end
end
Expand All @@ -302,29 +308,23 @@ function LibLedger:_LEDGER_SYNC(controller, data, sender)
end

-- Now go through the ids looking for ones we need, or ones to send if allowed.
local bucket = 1;
local firstThreshold, otherThresholds = controller:GetSyncThresholds();
local threshold = firstThreshold;
local hashBuckets, idBuckets = self:_BuildSyncBuckets(controller, data.now);
local merge = {};
local requested = {};
local mergeCount = 0;
local requestCount = 0;

for _, entry in ipairs(ledger) do
local id, entryDate = controller:GetEntryInfo(entry);
while data.now - entryDate > threshold do
bucket = bucket + 1;
threshold = threshold + otherThresholds;
end

for bucket, ids in pairs(idBuckets) do
if data.ids[bucket] then
if data.ids[bucket][id] then
-- We both have this entry.
data.ids[bucket][id] = nil;
elseif canSendEntries then
-- Sender doesn't have this entry.
merge[id] = entry;
mergeCount = mergeCount + 1;
for id in pairs(ids) do
if data.ids[bucket][id] then
-- We both have this entry.
data.ids[bucket][id] = nil;
elseif canSendEntries then
-- Sender doesn't have this entry.
merge[id] = ledger.entries[id];
mergeCount = mergeCount + 1;
end
end
end
end
Expand Down Expand Up @@ -371,16 +371,10 @@ function LibLedger:_LEDGER_MERGE(controller, data, sender)
-- The sender is requesting entries.
local merge = {};
local mergeCount = 0;
for _, entry in ipairs(ledger) do
local id = controller:GetEntryInfo(entry);
if data.requested[id] then
-- Sender wants this entry.
merge[id] = entry;
for id in pairs(data.requested) do
if ledger.entries[id] then
merge[id] = ledger.entries[id];
mergeCount = mergeCount + 1;
data.requested[id] = nil;
if not next(data.requested) then
break;
end
end
end

Expand Down Expand Up @@ -432,26 +426,30 @@ function LibLedger:_LEDGER_MERGE(controller, data, sender)

if next(data.merge) and senderIsPrivileged then
-- The sender is sharing entries. First remove the ones we already have.
for _, entry in ipairs(ledger) do
local id = controller:GetEntryInfo(entry);
data.merge[id] = nil;
for id in pairs(data.merge) do
if ledger.entries[id] then
data.merge[id] = nil;
end
end

-- At this point, data.merge contains entries we don't have. Add them
-- to the ledger and then sort it by date.
if next(data.merge) then
local mergeCount = 0;
for _, entry in pairs(data.merge) do
local id = controller:GetEntryInfo(entry);
table.insert(ledger, 1, entry);
for id, entry in pairs(data.merge) do
ledger.entries[id] = entry;
table.insert(ledger.ids, id);
mergeCount = mergeCount + 1;
end

table.sort(ledger, function(a, b)
local _, aDate = controller:GetEntryInfo(a);
local _, bDate = controller:GetEntryInfo(b);

return aDate > bDate;
table.sort(ledger.ids, function(a, b)
local aDate = controller:GetEntryDate(a);
local bDate = controller:GetEntryDate(b);
if aDate == bDate then
return a < b;
else
return aDate < bDate;
end
end);
controller:Log("Received %d entries from %s", mergeCount, sender);
controller:OnEntriesSynced(data.merge, sender);
Expand All @@ -462,7 +460,7 @@ end
function LibLedger:_LEDGER_REPLACE_INIT(controller, data, sender)
local privileged = controller:CanWriteEntries();
local senderIsPrivileged = controller:CanWriteEntries(sender);
local ledger, baseline = controller:GetLedger();
local _, baseline = controller:GetLedger();
local invalidLedger = (baseline == _invalidBaseline);
local canSendEntries = privileged and not invalidLedger;

Expand Down Expand Up @@ -514,7 +512,7 @@ function LibLedger:_LEDGER_REPLACE_REQUEST(controller, data, sender)
now = data.now,
version = data.version,

ledger = controller:PrepareEntries(ledger, data.now),
entries = controller:PrepareEntries(ledger.entries, data.now),
baseline = baseline
});
else
Expand All @@ -530,7 +528,7 @@ function LibLedger:_LEDGER_REPLACE_REQUEST(controller, data, sender)
now = data.now,
version = data.version,

ledger = controller:PrepareEntries(ledger, data.now),
entries = controller:PrepareEntries(ledger.entries, data.now),
baseline = baseline
}, sender);
end
Expand All @@ -539,15 +537,33 @@ end
function LibLedger:_LEDGER_REPLACE(controller, data, sender)
local privileged = controller:CanWriteEntries();
local senderIsPrivileged = controller:CanWriteEntries(sender);
local ledger, baseline = controller:GetLedger();
local _, baseline = controller:GetLedger();
local invalidLedger = (baseline == _invalidBaseline);
local canSendEntries = privileged and not invalidLedger;

-- The sender has given us an updated ledger.
if not senderIsPrivileged or data.baseline <= baseline then return; end

controller:Log("Got new ledger from %s", sender);
data.ledger = controller:RebuildEntries(data.ledger, data.now);
controller:SetLedger(data.ledger);
local newLedger = {
entries = controller:RebuildEntries(data.entries, data.now),
ids = {},
};
for id in pairs(newLedger.entries) do
table.insert(newLedger.ids, id);
end
table.sort(newLedger.ids, function(a, b)
local aDate = controller:GetEntryDate(a);
local bDate = controller:GetEntryDate(b);
if aDate == bDate then
return a < b;
else
return aDate < bDate;
end
end);

controller:SetLedger(newLedger);
controller:SetBaseline(data.baseline);
end

return LibLedger;
Loading

0 comments on commit ce87c92

Please sign in to comment.