Skip to content

Commit

Permalink
create Merge as part of MergingRun.new
Browse files Browse the repository at this point in the history
  • Loading branch information
mheinzel committed Dec 12, 2024
1 parent 0d42caa commit 3638038
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 18 deletions.
21 changes: 14 additions & 7 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
TraceExpectCompletedMerge (Run.size r) (Run.runFsPathsNumber r)
pure r

-- | Takes ownership of the runs passed.
newMerge :: MergePolicyForLevel
-> Merge.Level
-> LevelNo
Expand All @@ -723,13 +724,19 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
!runPaths = Paths.runPath root (uniqueToRunNumber n)
traceWith tr $ AtLevel ln $
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
-- TODO: creating the merge should happen in MR.new
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLevel resolve runPaths rs)
Merge.abort
mr <- case mergeMaybe of
Nothing -> error "newMerge: merges can not be empty"
Just m -> allocateTemp reg (MR.new mergePolicy rs m) releaseRef
-- TODO: There currently is a resource management bug that happens if an
-- exception occurs after calling MR.new. In this case, all changes roll
-- back, so some of the runs in rs will live in the Levels structure at
-- their original places again. However, we passed their references to
-- the MergingRun, which gets aborted, releasing the run references.
-- Instead of passing the original references into newMerge, we have to
-- duplicate the ones that previously existed in the level and then
-- freeTemp the original ones. This way, on the happy path the result is
-- the same, but if an exception occurs, the original references do not
-- get released.
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
releaseRef
case confMergeSchedule of
Incremental -> pure ()
OneShot -> do
Expand Down
30 changes: 26 additions & 4 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,21 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError),
import Control.Monad.Primitive
import Control.RefCount
import Control.TempRegistry
import Data.Maybe (fromMaybe)
import Data.Primitive.MutVar
import Data.Primitive.PrimVar
import qualified Data.Vector as V
import Database.LSMTree.Internal.Assertions (assert)
import Database.LSMTree.Internal.Entry (NumEntries (..), unNumEntries)
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.Merge (Merge, StepResult (..))
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.Paths (RunFsPaths (..))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc)
import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)

data MergingRun m h = MergingRun {
mergePolicy :: !MergePolicyForLevel
Expand Down Expand Up @@ -106,23 +112,39 @@ instance NFData MergeKnownCompleted where
rnf MergeKnownCompleted = ()
rnf MergeMaybeCompleted = ()

{-# SPECIALISE new :: MergePolicyForLevel -> V.Vector (Ref (Run IO h)) -> Merge IO h -> IO (Ref (MergingRun IO h)) #-}
{-# SPECIALISE new :: HasFS IO h -> HasBlockIO IO h -> ResolveSerialisedValue -> Run.RunDataCaching -> RunBloomFilterAlloc -> Merge.Level -> MergePolicyForLevel -> RunFsPaths -> V.Vector (Ref (Run IO h)) -> IO (Ref (MergingRun IO h)) #-}
-- | Create a new merging run, returning a reference to it that must ultimately
-- be released via 'releaseRef' or by calling 'expectCompleted'.
--
-- Takes over ownership of the references to the runs passed.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after some internal resources have already been created.
new ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
=> HasFS m h
-> HasBlockIO m h
-> ResolveSerialisedValue
-> Run.RunDataCaching
-> RunBloomFilterAlloc
-> Merge.Level
-> MergePolicyForLevel
-> RunFsPaths
-> V.Vector (Ref (Run m h))
-> Merge m h
-> m (Ref (MergingRun m h))
new mergePolicy runs merge = do
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
merge <- fromMaybe (error "newMerge: merges can not be empty")
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
let numInputRuns = NumRuns $ V.length runs
let numInputEntries = V.foldMap' Run.size runs
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE newCompleted :: MergePolicyForLevel -> NumRuns -> NumEntries -> Ref (Run IO h) -> IO (Ref (MergingRun IO h)) #-}
-- | Create a merging run that is already in the completed state, returning a
-- reference to it that must ultimately be released via 'releaseRef' or by
-- calling 'expectCompleted'.
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
Expand Down
11 changes: 4 additions & 7 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
SnapCompletedMerge run ->
MR.newCompleted mpfl nr ne run

SnapOngoingMerge runs spentCredits mergeLast -> do
SnapOngoingMerge runs spentCredits lvl -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs)
Merge.abort
mr <- case mergeMaybe of
Nothing -> error "openLevels: merges can not be empty"
Just m -> MR.new mpfl runs m
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs)
releaseRef

-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
Expand Down

0 comments on commit 3638038

Please sign in to comment.