diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 5e8f6de16..e76d9e1c7 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -1198,6 +1198,7 @@ openSnapshot sesh label tableType override snap resolve = do snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels -- Convert from the snapshot format, restoring merge progress in the process tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels' + releaseRuns reg snapLevels' tableCache <- mkLevelsCache reg tableLevels newWith reg sesh seshEnv conf' am $! TableContent { diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 1f1498237..403a6eb95 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -645,6 +645,13 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = where -- NOTE: @go@ is based on the @increment@ function from the -- @ScheduledMerges@ prototype. + -- + -- Releases the vector of runs. + go :: + LevelNo + -> V.Vector (Ref (Run m h)) + -> V.Vector (Level m h ) + -> m (V.Vector (Level m h)) go !ln rs (V.uncons -> Nothing) = do traceWith tr $ AtLevel ln TraceAddLevel -- Make a new level @@ -693,6 +700,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = ir' <- newMerge LevelLevelling Merge.LastLevel ln (rs' `V.snoc` r) pure $! Level ir' V.empty `V.cons` V.empty + -- Releases the incoming run. expectCompletedMerge :: LevelNo -> IncomingRun m h -> m (Ref (Run m h)) expectCompletedMerge ln ir = do r <- case ir of @@ -705,7 +713,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = TraceExpectCompletedMerge (Run.runFsPathsNumber r) pure r - -- Takes ownership of the runs passed. + -- Releases the runs. newMerge :: MergePolicyForLevel -> Merge.Level -> LevelNo @@ -717,7 +725,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = traceWith tr $ AtLevel ln $ TraceNewMergeSingleRun (Run.size r) (Run.runFsPathsNumber r) - pure (Single r) + -- We create a fresh reference and release the original one. + -- This will also make it easier to trace back where it was allocated. + ir <- Single <$> allocateTemp reg (dupRef r) releaseRef + freeTemp reg (releaseRef r) + pure ir + | otherwise = do assert (let l = V.length rs in l >= 2 && l <= 5) $ pure () !n <- incrUniqCounter uc @@ -726,19 +739,12 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = !runPaths = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel ln $ TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel - -- TODO: There currently is a resource management bug that happens if an - -- exception occurs after calling MR.new. In this case, all changes roll - -- back, so some of the runs in rs will live in the Levels structure at - -- their original places again. However, we passed their references to - -- the MergingRun, which gets aborted, releasing the run references. - -- Instead of passing the original references into newMerge, we have to - -- duplicate the ones that previously existed in the level and then - -- freeTemp the original ones. This way, on the happy path the result is - -- the same, but if an exception occurs, the original references do not - -- get released. + -- The runs will end up inside the merging run, with fresh references. + -- The original references can be released (but only on the happy path). mr <- allocateTemp reg (MR.new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths rs) releaseRef + V.forM_ rs $ \r -> freeTemp reg (releaseRef r) case confMergeSchedule of Incremental -> pure () OneShot -> do diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index e4afab00e..214411ef0 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -125,7 +125,7 @@ instance NFData MergeKnownCompleted where -- | Create a new merging run, returning a reference to it that must ultimately -- be released via 'releaseRef'. -- --- Takes over ownership of the references to the runs passed. +-- Duplicates the supplied references to the runs. -- -- This function should be run with asynchronous exceptions masked to prevent -- failing after internal resources have already been created. @@ -141,14 +141,17 @@ new :: -> RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Ref (MergingRun m h)) -new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do - merge <- fromMaybe (error "newMerge: merges can not be empty") - <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs - let numInputRuns = NumRuns $ V.length runs - let numInputEntries = V.foldMap' Run.size runs - spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 - unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $ - OngoingMerge runs spentCreditsVar merge +new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths inputRuns = + -- If creating the Merge fails, we must release the references again. + withTempRegistry $ \reg -> do + runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns + merge <- fromMaybe (error "newMerge: merges can not be empty") + <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs + let numInputRuns = NumRuns $ V.length runs + let numInputEntries = V.foldMap' Run.size runs + spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0 + unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $ + OngoingMerge runs spentCreditsVar merge {-# SPECIALISE newCompleted :: MergePolicyForLevel @@ -158,6 +161,11 @@ new hfs hbio resolve caching alloc mergeLevel mergePolicy runPaths runs = do -> IO (Ref (MergingRun IO h)) #-} -- | Create a merging run that is already in the completed state, returning a -- reference that must ultimately be released via 'releaseRef'. +-- +-- Duplicates the supplied reference to the run. +-- +-- This function should be run with asynchronous exceptions masked to prevent +-- failing after internal resources have already been created. newCompleted :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) => MergePolicyForLevel @@ -165,9 +173,10 @@ newCompleted :: -> NumEntries -> Ref (Run m h) -> m (Ref (MergingRun m h)) -newCompleted mergePolicy numInputRuns numInputEntries run = do - unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $ - CompletedMerge run +newCompleted mergePolicy numInputRuns numInputEntries inputRun = do + bracketOnError (dupRef inputRun) releaseRef $ \run -> + unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $ + CompletedMerge run {-# INLINE unsafeNew #-} unsafeNew :: diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index d33fd8315..4d678ffb3 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -15,6 +15,7 @@ module Database.LSMTree.Internal.Snapshot ( -- * Runs , snapshotRuns , openRuns + , releaseRuns -- * Opening from levels snapshot format , fromSnapLevels -- * Hard links @@ -31,7 +32,7 @@ import Control.Monad.Class.MonadThrow (MonadMask) import Control.Monad.Primitive (PrimMonad) import Control.RefCount import Control.TempRegistry -import Data.Foldable (sequenceA_) +import Data.Foldable (sequenceA_, traverse_) import Data.Primitive.PrimVar import Data.Text (Text) import Data.Traversable (for) @@ -258,6 +259,8 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do -- into @targetDir@ with new, unique names (using @uniqCounter@). Each set of -- (hard linked) files that represents a run is opened and verified, returning -- 'Run's as a result. +-- +-- The result must ultimately be released using 'releaseRuns'. openRuns :: (MonadMask m, MonadSTM m, MonadST m, MonadMVar m) => TempRegistry m @@ -287,6 +290,14 @@ openRuns releaseRef pure (SnapLevels levels') +{-# SPECIALISE releaseRuns :: + TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () + #-} +releaseRuns :: + (MonadMask m, MonadST m, MonadMVar m) + => TempRegistry m -> SnapLevels (Ref (Run m h)) -> m () +releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) + {------------------------------------------------------------------------------- Opening from levels snapshot format -------------------------------------------------------------------------------} @@ -302,6 +313,7 @@ openRuns -> SnapLevels (Ref (Run IO h)) -> IO (Levels IO h) #-} +-- | Duplicates runs and re-creates merging runs. fromSnapLevels :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => TempRegistry m @@ -321,10 +333,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapLevel :: LevelNo -> SnapLevel (Ref (Run m h)) -> m (Level m h) fromSnapLevel ln SnapLevel{..} = do incomingRun <- fromSnapIncomingRun snapIncoming - pure Level { - incomingRun - , residentRuns = snapResidentRuns - } + residentRuns <- V.mapM dupRun snapResidentRuns + pure Level {incomingRun , residentRuns} where caching = diskCachePolicyForLevel confDiskCachePolicy ln alloc = bloomFilterAllocForLevel conf ln @@ -332,8 +342,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapIncomingRun :: SnapIncomingRun (Ref (Run m h)) -> m (IncomingRun m h) - fromSnapIncomingRun (SnapSingleRun run) = - pure (Single run) + fromSnapIncomingRun (SnapSingleRun run) = do + Single <$> dupRun run fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do Merging <$> case smrs of SnapCompletedMerge run -> @@ -344,7 +354,6 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve mr <- allocateTemp reg (MR.new hfs hbio resolve caching alloc lvl mpfl (mkPath rn) runs) releaseRef - -- When a snapshot is created, merge progress is lost, so we -- have to redo merging work here. UnspentCredits and -- SpentCredits track how many credits were supplied before the @@ -354,6 +363,8 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr return mr + dupRun r = allocateTemp reg (dupRef r) releaseRef + {------------------------------------------------------------------------------- Hard links -------------------------------------------------------------------------------}