From 9bc9585d87d5dbb0cad5a8c10b52f0f1800d36aa Mon Sep 17 00:00:00 2001 From: Matthias Heinzel Date: Wed, 11 Dec 2024 12:47:55 +0100 Subject: [PATCH] create Merge as part of MergingRun.new --- .../LSMTree/Internal/MergeSchedule.hs | 21 ++++++++++----- src/Database/LSMTree/Internal/MergingRun.hs | 27 ++++++++++++++++--- src/Database/LSMTree/Internal/Snapshot.hs | 11 +++----- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 7d51c7910..c8c5cdb82 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -705,6 +705,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = TraceExpectCompletedMerge (Run.runFsPathsNumber r) pure r + -- Takes ownership of the runs passed. newMerge :: MergePolicyForLevel -> Merge.Level -> LevelNo @@ -725,13 +726,19 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = !runPaths = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel ln $ TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel - -- TODO: creating the merge should happen in MR.new - mergeMaybe <- allocateMaybeTemp reg - (Merge.new hfs hbio caching alloc mergeLevel resolve runPaths rs) - Merge.abort - mr <- case mergeMaybe of - Nothing -> error "newMerge: merges can not be empty" - Just m -> allocateTemp reg (MR.new mergePolicy rs m) releaseRef + -- TODO: There currently is a resource management bug that happens if an + -- exception occurs after calling MR.new. In this case, all changes roll + -- back, so some of the runs in rs will live in the Levels structure at + -- their original places again. However, we passed their references to + -- the MergingRun, which gets aborted, releasing the run references. + -- Instead of passing the original references into newMerge, we have to + -- duplicate the ones that previously existed in the level and then + -- freeTemp the original ones. This way, on the happy path the result is + -- the same, but if an exception occurs, the original references do not + -- get released. + mr <- allocateTemp reg + (MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs) + releaseRef case confMergeSchedule of Incremental -> pure () OneShot -> do diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index 50934ae6f..fe0f99060 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -32,15 +32,21 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError), MonadMask) import Control.Monad.Primitive import Control.RefCount +import Data.Maybe (fromMaybe) import Data.Primitive.MutVar import Data.Primitive.PrimVar import qualified Data.Vector as V import Database.LSMTree.Internal.Assertions (assert) import Database.LSMTree.Internal.Entry (NumEntries (..), unNumEntries) +import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue) import Database.LSMTree.Internal.Merge (Merge, StepResult (..)) import qualified Database.LSMTree.Internal.Merge as Merge +import Database.LSMTree.Internal.Paths (RunFsPaths (..)) import Database.LSMTree.Internal.Run (Run) import qualified Database.LSMTree.Internal.Run as Run +import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc) +import System.FS.API (HasFS) +import System.FS.BlockIO.API (HasBlockIO) data MergingRun m h = MergingRun { mergePolicy :: !MergePolicyForLevel @@ -104,16 +110,29 @@ instance NFData MergeKnownCompleted where rnf MergeKnownCompleted = () rnf MergeMaybeCompleted = () -{-# SPECIALISE new :: MergePolicyForLevel -> V.Vector (Ref (Run IO h)) -> Merge IO h -> IO (Ref (MergingRun IO h)) #-} +{-# SPECIALISE new :: HasFS IO h -> HasBlockIO IO h -> ResolveSerialisedValue -> Run.RunDataCaching -> RunBloomFilterAlloc -> Merge.Level -> MergePolicyForLevel -> RunFsPaths -> V.Vector (Ref (Run IO h)) -> IO (Ref (MergingRun IO h)) #-} -- | Create a new merging run, returning a reference to it that must ultimately -- be released via 'releaseRef'. +-- +-- Takes over ownership of the references to the runs passed. +-- +-- This function should be run with asynchronous exceptions masked to prevent +-- failing after internal resources have already been created. new :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => MergePolicyForLevel + => HasFS m h + -> HasBlockIO m h + -> ResolveSerialisedValue + -> Run.RunDataCaching + -> RunBloomFilterAlloc + -> Merge.Level + -> MergePolicyForLevel + -> RunFsPaths -> V.Vector (Ref (Run m h)) - -> Merge m h -> m (Ref (MergingRun m h)) -new mergePolicy runs merge = do +new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do + merge <- fromMaybe (error "newMerge: merges can not be empty") + <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs let numInputRuns = NumRuns $ V.length runs let numInputEntries = V.foldMap' Run.size runs spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index 207dad879..5dc47e3af 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -339,14 +339,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve SnapCompletedMerge run -> MR.newCompleted mpfl nr ne run - SnapOngoingMerge runs spentCredits mergeLast -> do + SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc - mergeMaybe <- allocateMaybeTemp reg - (Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs) - Merge.abort - mr <- case mergeMaybe of - Nothing -> error "openLevels: merges can not be empty" - Just m -> MR.new mpfl runs m + mr <- allocateTemp reg + (MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs) + releaseRef -- When a snapshot is created, merge progress is lost, so we -- have to redo merging work here. UnspentCredits and