Skip to content

Commit

Permalink
fix Ref handling in addRunToLevels and fromSnapLevels
Browse files Browse the repository at this point in the history
  • Loading branch information
mheinzel committed Dec 17, 2024
1 parent e755b82 commit 69b32e6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ openSnapshot sesh label tableType override snap resolve = do
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'
releaseRuns reg snapLevels'

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
Expand Down
30 changes: 18 additions & 12 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
where
-- NOTE: @go@ is based on the @increment@ function from the
-- @ScheduledMerges@ prototype.
--
-- Releases the vector of runs.
go ::
LevelNo
-> V.Vector (Ref (Run m h))
-> V.Vector (Level m h )
-> m (V.Vector (Level m h))
go !ln rs (V.uncons -> Nothing) = do
traceWith tr $ AtLevel ln TraceAddLevel
-- Make a new level
Expand Down Expand Up @@ -693,6 +700,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r)
pure $! Level ir' V.empty `V.cons` V.empty

-- Releases the incoming run.
expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h))
expectCompletedMerge ln ir = do
r <- case ir of
Expand All @@ -705,7 +713,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
TraceExpectCompletedMerge (Run.runFsPathsNumber r)
pure r

-- Takes ownership of the runs passed.
-- Releases the runs.
newMerge :: MergePolicyForLevel
-> Merge.Level
-> LevelNo
Expand All @@ -717,7 +725,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
traceWith tr $ AtLevel ln $
TraceNewMergeSingleRun (Run.size r)
(Run.runFsPathsNumber r)
pure (Single r)
-- We create a fresh reference and release the original one.
-- This will also make it easier to trace back where it was allocated.
ir <- Single <$> allocateTemp reg (dupRef r) releaseRef
freeTemp reg (releaseRef r)
pure ir

| otherwise = do
assert (let l = V.length rs in l >= 2 && l <= 5) $ pure ()
!n <- incrUniqCounter uc
Expand All @@ -726,19 +739,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
!runPaths = Paths.runPath root (uniqueToRunNumber n)
traceWith tr $ AtLevel ln $
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
-- TODO: There currently is a resource management bug that happens if an
-- exception occurs after calling MR.new. In this case, all changes roll
-- back, so some of the runs in rs will live in the Levels structure at
-- their original places again. However, we passed their references to
-- the MergingRun, which gets aborted, releasing the run references.
-- Instead of passing the original references into newMerge, we have to
-- duplicate the ones that previously existed in the level and then
-- freeTemp the original ones. This way, on the happy path the result is
-- the same, but if an exception occurs, the original references do not
-- get released.
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs)
releaseRef
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
case confMergeSchedule of
Incremental -> pure ()
OneShot -> do
Expand Down
33 changes: 21 additions & 12 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ instance NFData MergeKnownCompleted where
-- | Create a new merging run, returning a reference to it that must ultimately
-- be released via 'releaseRef'.
--
-- Takes over ownership of the references to the runs passed.
-- Duplicates the supplied references to the runs.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
Expand All @@ -141,14 +141,17 @@ new ::
-> RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Ref (MergingRun m h))
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
merge <- fromMaybe (error "newMerge: merges can not be empty")
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
let numInputRuns = NumRuns $ V.length runs
let numInputEntries = V.foldMap' Run.size runs
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge
new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns =
-- If creating the Merge fails, we must release the references again.
withTempRegistry $ \reg -> do
runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns
merge <- fromMaybe (error "newMerge: merges can not be empty")
<$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs
let numInputRuns = NumRuns $ V.length runs
let numInputEntries = V.foldMap' Run.size runs
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE newCompleted ::
MergePolicyForLevel
Expand All @@ -158,16 +161,22 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do
-> IO (Ref (MergingRun IO h)) #-}
-- | Create a merging run that is already in the completed state, returning a
-- reference that must ultimately be released via 'releaseRef'.
--
-- Duplicates the supplied reference to the run.
--
-- This function should be run with asynchronous exceptions masked to prevent
-- failing after internal resources have already been created.
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
-> NumEntries
-> Ref (Run m h)
-> m (Ref (MergingRun m h))
newCompleted mergePolicy numInputRuns numInputEntries run = do
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
CompletedMerge run
newCompleted mergePolicy numInputRuns numInputEntries inputRun = do
bracketOnError (dupRef inputRun) releaseRef $ \run ->
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
CompletedMerge run

{-# INLINE unsafeNew #-}
unsafeNew ::
Expand Down
27 changes: 19 additions & 8 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Database.LSMTree.Internal.Snapshot (
-- * Runs
, snapshotRuns
, openRuns
, releaseRuns
-- * Opening from levels snapshot format
, fromSnapLevels
-- * Hard links
Expand All @@ -31,7 +32,7 @@ import Control.Monad.Class.MonadThrow (MonadMask)
import Control.Monad.Primitive (PrimMonad)
import Control.RefCount
import Control.TempRegistry
import Data.Foldable (sequenceA_)
import Data.Foldable (sequenceA_, traverse_)
import Data.Primitive.PrimVar
import Data.Text (Text)
import Data.Traversable (for)
Expand Down Expand Up @@ -258,6 +259,8 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do
-- into @targetDir@ with new, unique names (using @uniqCounter@). Each set of
-- (hard linked) files that represents a run is opened and verified, returning
-- 'Run's as a result.
--
-- The result must ultimately be released using 'releaseRuns'.
openRuns ::
(MonadMask m, MonadSTM m, MonadST m, MonadMVar m)
=> TempRegistry m
Expand Down Expand Up @@ -287,6 +290,14 @@ openRuns
releaseRef
pure (SnapLevels levels')

{-# SPECIALISE releaseRuns ::
TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO ()
#-}
releaseRuns ::
(MonadMask m, MonadST m, MonadMVar m)
=> TempRegistry m -> SnapLevels (Ref (Run m h)) -> m ()
releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r)

{-------------------------------------------------------------------------------
Opening from levels snapshot format
-------------------------------------------------------------------------------}
Expand All @@ -302,6 +313,7 @@ openRuns
-> SnapLevels (Ref (Run IO h))
-> IO (Levels IO h)
#-}
-- | Duplicates runs and re-creates merging runs.
fromSnapLevels ::
forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m)
=> TempRegistry m
Expand All @@ -321,19 +333,17 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
fromSnapLevel :: LevelNo -> SnapLevel (Ref (Run m h)) -> m (Level m h)
fromSnapLevel ln SnapLevel{..} = do
incomingRun <- fromSnapIncomingRun snapIncoming
pure Level {
incomingRun
, residentRuns = snapResidentRuns
}
residentRuns <- V.mapM dupRun snapResidentRuns
pure Level {incomingRun , residentRuns}
where
caching = diskCachePolicyForLevel confDiskCachePolicy ln
alloc = bloomFilterAllocForLevel conf ln

fromSnapIncomingRun ::
SnapIncomingRun (Ref (Run m h))
-> m (IncomingRun m h)
fromSnapIncomingRun (SnapSingleRun run) =
pure (Single run)
fromSnapIncomingRun (SnapSingleRun run) = do
Single <$> dupRun run
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
Merging <$> case smrs of
SnapCompletedMerge run ->
Expand All @@ -344,7 +354,6 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs)
releaseRef

-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
-- SpentCredits track how many credits were supplied before the
Expand All @@ -354,6 +363,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
return mr

dupRun r = allocateTemp reg (dupRef r) releaseRef

{-------------------------------------------------------------------------------
Hard links
-------------------------------------------------------------------------------}
Expand Down

0 comments on commit 69b32e6

Please sign in to comment.