From 4d7c8832c2e360718d54432f233efc9ba86c1685 Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Tue, 24 Dec 2024 09:54:29 +0100 Subject: [PATCH 1/4] Use `ActionRegistry` instead of `TempRegistry` everywhere --- src-control/Control/ActionRegistry.hs | 2 - src/Database/LSMTree/Internal.hs | 62 +++++----- .../LSMTree/Internal/MergeSchedule.hs | 112 +++++++++--------- src/Database/LSMTree/Internal/MergingRun.hs | 10 +- src/Database/LSMTree/Internal/Snapshot.hs | 40 +++---- test/Test/Database/LSMTree/Internal/Run.hs | 4 +- 6 files changed, 114 insertions(+), 116 deletions(-) diff --git a/src-control/Control/ActionRegistry.hs b/src-control/Control/ActionRegistry.hs index 7819c2400..ab85bb9cc 100644 --- a/src-control/Control/ActionRegistry.hs +++ b/src-control/Control/ActionRegistry.hs @@ -34,8 +34,6 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NE import Data.Primitive.MutVar --- TODO: replace TempRegistry by ActionRegistry - -- TODO: add tests using fs-sim/io-sim to make sure exception safety is -- guaranteed. diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 7d5dde759..c755e4027 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -83,7 +83,6 @@ import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.Arena (ArenaManager, newArenaManager) import Data.Either (fromRight) @@ -702,12 +701,12 @@ new :: new sesh conf = do traceWith (sessionTracer sesh) TraceNewTable withOpenSession sesh $ \seshEnv -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do am <- newArenaManager blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> incrUniqCounter (sessionUniqCounter seshEnv) tableWriteBufferBlobs - <- allocateTemp reg + <- withRollback reg (WBB.new (sessionHasFS seshEnv) blobpath) releaseRef let tableWriteBuffer = WB.empty @@ -722,7 +721,7 @@ new sesh conf = do newWith reg sesh seshEnv conf am tc {-# SPECIALISE newWith :: - TempRegistry IO + ActionRegistry IO -> Session IO h -> SessionEnv IO h -> TableConfig @@ -730,8 +729,8 @@ new sesh conf = do -> TableContent IO h -> IO (Table IO h) #-} newWith :: - (MonadSTM m, MonadMVar m) - => TempRegistry m + (MonadSTM m, MonadMVar m, PrimMonad m) + => ActionRegistry m -> Session m h -> SessionEnv m h -> TableConfig @@ -754,8 +753,9 @@ newWith reg sesh seshEnv conf !am !tc = do let !tid = uniqueToWord64 tableId !t = Table conf tableVar am tr tid sesh -- Track the current table - freeTemp reg $ modifyMVar_ (sessionOpenTables seshEnv) - $ pure . Map.insert (uniqueToWord64 tableId) t + delayedCommit reg $ + modifyMVar_ (sessionOpenTables seshEnv) $ + pure . Map.insert (uniqueToWord64 tableId) t pure $! t {-# SPECIALISE close :: Table IO h -> IO () #-} @@ -766,7 +766,7 @@ close :: -> m () close t = do traceWith (tableTracer t) TraceCloseTable - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableState t)) (atomically . RW.unsafeReleaseWriteAccess (tableState t)) $ \reg -> \case TableClosed -> pure TableClosed @@ -774,7 +774,7 @@ close t = do -- Since we have a write lock on the table state, we know that we are the -- only thread currently closing the table. We can safely make the session -- forget about this table. - freeTemp reg (tableSessionUntrackTable (tableId t) thEnv) + delayedCommit reg (tableSessionUntrackTable (tableId t) thEnv) RW.withWriteAccess_ (tableContent thEnv) $ \tc -> do releaseTableContent reg tc pure tc @@ -868,7 +868,7 @@ updates resolve es t = do let conf = tableConfig t withOpenTable t $ \thEnv -> do let hfs = tableHasFS thEnv - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableContent thEnv)) (atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv)) $ \reg -> do updatesWithInterleavedFlushes @@ -1005,10 +1005,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession cursorSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do (wb, wbblobs, cursorRuns) <- dupTableContent reg (tableContent thEnv) cursorReaders <- - allocateMaybeTemp reg + withRollbackMaybe reg (Readers.new offsetKey (Just (wb, wbblobs)) cursorRuns) Readers.close let cursorWBB = wbblobs @@ -1017,9 +1017,9 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- Track cursor, but careful: If now an exception is raised, all -- resources get freed by the registry, so if the session still -- tracks 'cursor' (which is 'CursorOpen'), it later double frees. - -- Therefore, we only track the cursor if 'withTempRegistry' exits - -- successfully, i.e. using 'freeTemp'. - freeTemp reg $ + -- Therefore, we only track the cursor if 'withActionRegistry' exits + -- successfully, i.e. using 'delayedCommit'. + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.insert cursorId cursor pure $! cursor @@ -1030,10 +1030,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do RW.withReadAccess contentVar $ \content -> do let !wb = tableWriteBuffer content !wbblobs = tableWriteBufferBlobs content - wbblobs' <- allocateTemp reg (dupRef wbblobs) releaseRef + wbblobs' <- withRollback reg (dupRef wbblobs) releaseRef let runs = cachedRuns (tableCache content) runs' <- V.forM runs $ \r -> - allocateTemp reg (dupRef r) releaseRef + withRollback reg (dupRef r) releaseRef pure (wb, wbblobs', runs') {-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-} @@ -1044,20 +1044,20 @@ closeCursor :: -> m () closeCursor Cursor {..} = do traceWith cursorTracer $ TraceCloseCursor - modifyWithTempRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case + modifyWithActionRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case CursorClosed -> return CursorClosed CursorOpen CursorEnv {..} -> do -- This should be safe-ish, but it's still not ideal, because it doesn't -- rule out sync exceptions in the cleanup operations. -- In that case, the cursor ends up closed, but resources might not have -- been freed. Probably better than the other way around, though. - freeTemp reg $ + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.delete cursorId - forM_ cursorReaders $ freeTemp reg . Readers.close - V.forM_ cursorRuns $ freeTemp reg . releaseRef - freeTemp reg (releaseRef cursorWBB) + forM_ cursorReaders $ delayedCommit reg . Readers.close + V.forM_ cursorRuns $ delayedCommit reg . releaseRef + delayedCommit reg (releaseRef cursorWBB) return CursorClosed {-# SPECIALISE readCursor :: @@ -1145,7 +1145,7 @@ createSnapshot resolve snap label tableType t = do traceWith (tableTracer t) $ TraceSnapshot snap let conf = tableConfig t withOpenTable t $ \thEnv -> - withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects + withActionRegistry $ \reg -> do -- TODO: use the temp registry for all side effects let hfs = tableHasFS thEnv hbio = tableHasBlockIO thEnv @@ -1158,13 +1158,13 @@ createSnapshot resolve snap label tableType t = do else -- we assume the snapshots directory already exists, so we just have -- to create the directory for this specific snapshot. - allocateTemp reg + withRollback_ reg (FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)) - (\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) + (FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) -- For the temporary implementation it is okay to just flush the buffer -- before taking the snapshot. - content <- modifyWithTempRegistry + content <- modifyWithActionRegistry (RW.unsafeAcquireWriteAccess (tableContent thEnv)) (atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv)) $ \innerReg content -> do @@ -1223,7 +1223,7 @@ openSnapshot :: openSnapshot sesh label tableType override snap resolve = do traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override withOpenSession sesh $ \seshEnv -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let hfs = sessionHasFS seshEnv hbio = sessionHasBlockIO seshEnv @@ -1250,7 +1250,7 @@ openSnapshot sesh label tableType override snap resolve = do am <- newArenaManager blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> incrUniqCounter (sessionUniqCounter seshEnv) - tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath) + tableWriteBufferBlobs <- withRollback reg (WBB.new hfs blobpath) releaseRef let actDir = Paths.activeDir (sessionRoot seshEnv) @@ -1331,7 +1331,7 @@ duplicate t@Table{..} = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession tableSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do -- The table contents escape the read access, but we just added references -- to each run so it is safe. content <- RW.withReadAccess tableContent (duplicateTableContent reg) @@ -1384,7 +1384,7 @@ unions ts = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. - modifyWithTempRegistry + modifyWithActionRegistry (atomically $ RW.unsafeAcquireReadAccess (sessionState sesh)) (\_ -> atomically $ RW.unsafeReleaseReadAccess (sessionState sesh)) $ \reg -> \case SessionClosed -> throwIO ErrSessionClosed diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 85cf9bd46..537cccb4d 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -27,13 +27,13 @@ module Database.LSMTree.Internal.MergeSchedule ( , creditThresholdForLevel ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.BloomFilter (Bloom) import Data.Foldable (fold) @@ -117,26 +117,26 @@ data TableContent m h = TableContent { , tableCache :: !(LevelsCache m h) } -{-# SPECIALISE duplicateTableContent :: TempRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} +{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} duplicateTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m (TableContent m h) duplicateTableContent reg (TableContent wb wbb levels cache) = do - wbb' <- allocateTemp reg (dupRef wbb) releaseRef + wbb' <- withRollback reg (dupRef wbb) releaseRef levels' <- duplicateLevels reg levels cache' <- duplicateLevelsCache reg cache return $! TableContent wb wbb' levels' cache' -{-# SPECIALISE releaseTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-} +{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-} releaseTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m () releaseTableContent reg (TableContent _wb wbb levels cache) = do - freeTemp reg (releaseRef wbb) + delayedCommit reg (releaseRef wbb) releaseLevels reg levels releaseLevelsCache reg cache @@ -166,7 +166,7 @@ data LevelsCache m h = LevelsCache_ { } {-# SPECIALISE mkLevelsCache :: - TempRegistry IO + ActionRegistry IO -> Levels IO h -> IO (LevelsCache IO h) #-} -- | Flatten the argument 'Level's into a single vector of runs, including all @@ -174,13 +174,13 @@ data LevelsCache m h = LevelsCache_ { -- 'LevelsCache'. The cache will take a reference for each of its runs. mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> Levels m h -> m (LevelsCache m h) mkLevelsCache reg lvls = do rs <- foldRunAndMergeM (fmap V.singleton . dupRun) - (\mr -> allocateTemp reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) + (\mr -> withRollback reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) lvls pure $! LevelsCache_ { cachedRuns = rs @@ -189,7 +189,7 @@ mkLevelsCache reg lvls = do , cachedKOpsFiles = mapStrict (\(DeRef r) -> Run.runKOpsFile r) rs } where - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef -- TODO: this is not terribly performant, but it is also not sure if we are -- going to need this in the end. We might get rid of the LevelsCache. @@ -207,7 +207,7 @@ mkLevelsCache reg lvls = do (incoming <>) . fold <$> V.forM rs k1 {-# SPECIALISE rebuildCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> Levels IO h -> IO (LevelsCache IO h) #-} @@ -233,7 +233,7 @@ mkLevelsCache reg lvls = do -- Lookups should no invalidate blob erferences. rebuildCache :: (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> LevelsCache m h -- ^ old cache -> Levels m h -- ^ new levels -> m (LevelsCache m h) -- ^ new cache @@ -242,31 +242,31 @@ rebuildCache reg oldCache newLevels = do mkLevelsCache reg newLevels {-# SPECIALISE duplicateLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO (LevelsCache IO h) #-} duplicateLevelsCache :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h) duplicateLevelsCache reg cache = do rs' <- V.forM (cachedRuns cache) $ \r -> - allocateTemp reg (dupRef r) releaseRef + withRollback reg (dupRef r) releaseRef return cache { cachedRuns = rs' } {-# SPECIALISE releaseLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO () #-} releaseLevelsCache :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m () releaseLevelsCache reg cache = V.forM_ (cachedRuns cache) $ \r -> - freeTemp reg (releaseRef r) + delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Levels, runs and ongoing merges @@ -293,52 +293,52 @@ mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | V.null nextLevels = LevelLevelling -- levelling on last level | otherwise = LevelTiering -{-# SPECIALISE duplicateLevels :: TempRegistry IO -> Levels IO h -> IO (Levels IO h) #-} +{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} duplicateLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m (Levels m h) duplicateLevels reg levels = V.forM levels $ \Level {incomingRun, residentRuns} -> do incomingRun' <- duplicateIncomingRun reg incomingRun residentRuns' <- V.forM residentRuns $ \r -> - allocateTemp reg (dupRef r) releaseRef + withRollback reg (dupRef r) releaseRef return $! Level { incomingRun = incomingRun', residentRuns = residentRuns' } -{-# SPECIALISE releaseLevels :: TempRegistry IO -> Levels IO h -> IO () #-} +{-# SPECIALISE releaseLevels :: ActionRegistry IO -> Levels IO h -> IO () #-} releaseLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m () releaseLevels reg levels = V.forM_ levels $ \Level {incomingRun, residentRuns} -> do releaseIncomingRun reg incomingRun - V.mapM_ (freeTemp reg . releaseRef) residentRuns + V.mapM_ (delayedCommit reg . releaseRef) residentRuns -{-# SPECIALISE duplicateIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} +{-# SPECIALISE duplicateIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} duplicateIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m (IncomingRun m h) duplicateIncomingRun reg (Single r) = - Single <$> allocateTemp reg (dupRef r) releaseRef + Single <$> withRollback reg (dupRef r) releaseRef duplicateIncomingRun reg (Merging mp mr) = - Merging mp <$> allocateTemp reg (dupRef mr) releaseRef + Merging mp <$> withRollback reg (dupRef mr) releaseRef -{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-} +{-# SPECIALISE releaseIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO () #-} releaseIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m () -releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) -releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr) +releaseIncomingRun reg (Single r) = delayedCommit reg (releaseRef r) +releaseIncomingRun reg (Merging _ mr) = delayedCommit reg (releaseRef mr) {-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-} iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m () @@ -357,7 +357,7 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -> SessionRoot -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | A single batch of updates can fill up the write buffer multiple times. We @@ -395,7 +395,7 @@ updatesWithInterleavedFlushes :: -> SessionRoot -> UniqCounter m -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do @@ -474,7 +474,7 @@ addWriteBufferEntries hfs f wbblobs maxn = -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | Flush the write buffer to disk, regardless of whether it is full or not. @@ -490,7 +490,7 @@ flushWriteBuffer :: -> HasBlockIO m h -> SessionRoot -> UniqCounter m - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} @@ -504,7 +504,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} !alloc = bloomFilterAllocForLevel conf l !path = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc - r <- allocateTemp reg + r <- withRollback reg (Run.fromWriteBuffer hfs hbio cache alloc @@ -512,8 +512,8 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} (tableWriteBuffer tc) (tableWriteBufferBlobs tc)) releaseRef - freeTemp reg (releaseRef (tableWriteBufferBlobs tc)) - wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) + delayedCommit reg (releaseRef (tableWriteBufferBlobs tc)) + wbblobs' <- withRollback reg (WBB.new hfs (Paths.tableBlobPath root n)) releaseRef levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc) tableCache' <- rebuildCache reg (tableCache tc) levels' @@ -533,7 +533,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} -> SessionRoot -> UniqCounter IO -> Ref (Run IO h) - -> TempRegistry IO + -> ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} -- | Add a run to the levels, and propagate merges. @@ -551,7 +551,7 @@ addRunToLevels :: -> SessionRoot -> UniqCounter m -> Ref (Run m h) - -> TempRegistry m + -> ActionRegistry m -> Levels m h -> m (Levels m h) addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = do @@ -620,8 +620,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = r <- case ir of Single r -> pure r Merging _ mr -> do - r <- allocateTemp reg (MR.expectCompleted mr) releaseRef - freeTemp reg (releaseRef mr) + r <- withRollback reg (MR.expectCompleted mr) releaseRef + delayedCommit reg (releaseRef mr) pure r traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (Run.runFsPathsNumber r) @@ -641,8 +641,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = (Run.runFsPathsNumber r) -- We create a fresh reference and release the original one. -- This will also make it easier to trace back where it was allocated. - ir <- Single <$> allocateTemp reg (dupRef r) releaseRef - freeTemp reg (releaseRef r) + ir <- Single <$> withRollback reg (dupRef r) releaseRef + delayedCommit reg (releaseRef r) pure ir | otherwise = do @@ -655,10 +655,10 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel -- The runs will end up inside the merging run, with fresh references. -- The original references can be released (but only on the happy path). - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs) releaseRef - V.forM_ rs $ \r -> freeTemp reg (releaseRef r) + V.forM_ rs $ \r -> delayedCommit reg (releaseRef r) case confMergeSchedule of Incremental -> pure () OneShot -> do diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index cf95aff51..6e4dfac45 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -23,6 +23,7 @@ module Database.LSMTree.Internal.MergingRun ( , MergeKnownCompleted (..) ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.DeepSeq (NFData (..)) import Control.Monad (void, when) @@ -32,7 +33,6 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError), MonadMask) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Data.Maybe (fromMaybe) import Data.Primitive.MutVar import Data.Primitive.PrimVar @@ -140,8 +140,8 @@ new :: -> m (Ref (MergingRun m h)) new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns = -- If creating the Merge fails, we must release the references again. - withTempRegistry $ \reg -> do - runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns + withActionRegistry $ \reg -> do + runs <- V.mapM (\r -> withRollback reg (dupRef r) releaseRef) inputRuns merge <- fromMaybe (error "newMerge: merges can not be empty") <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs let numInputRuns = NumRuns $ V.length runs @@ -219,8 +219,8 @@ duplicateRuns (DeRef mr) = -- does not get completed concurrently before we are done. withMVar (mergeState mr) $ \case CompletedMerge r -> V.singleton <$> dupRef r - OngoingMerge rs _ _ -> withTempRegistry $ \reg -> - V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) rs + OngoingMerge rs _ _ -> withActionRegistry $ \reg -> + V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs {------------------------------------------------------------------------------- Credits diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index c6307318f..b94492f06 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -23,6 +23,7 @@ module Database.LSMTree.Internal.Snapshot ( , hardLinkRunFiles ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Concurrent.Class.MonadSTM (MonadSTM) import Control.DeepSeq (NFData (..)) @@ -31,7 +32,6 @@ import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadThrow (MonadMask) import Control.Monad.Primitive (PrimMonad) import Control.RefCount -import Control.TempRegistry import Data.Foldable (sequenceA_, traverse_) import Data.Primitive.PrimVar import Data.Text (Text) @@ -215,7 +215,7 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m -------------------------------------------------------------------------------} {-# SPECIALISE snapshotRuns :: - TempRegistry IO + ActionRegistry IO -> HasBlockIO IO h -> NamedSnapshotDir -> SnapLevels (Ref (Run IO h)) @@ -225,8 +225,8 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m -- the @targetDir@ directory. The hard links and the @targetDir@ are made -- durable on disk. snapshotRuns :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasBlockIO m h -> NamedSnapshotDir -> SnapLevels (Ref (Run m h)) @@ -245,7 +245,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do pure levels' {-# SPECIALISE openRuns :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -263,7 +263,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do -- The result must ultimately be released using 'releaseRuns'. openRuns :: (MonadMask m, MonadSTM m, MonadST m, MonadMVar m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -285,25 +285,25 @@ openRuns let targetPaths = RunFsPaths targetDir runNum' hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths - allocateTemp reg + withRollback reg (Run.openFromDisk hfs hbio caching targetPaths) releaseRef pure (SnapLevels levels') {-# SPECIALISE releaseRuns :: - TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () + ActionRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () #-} releaseRuns :: - (MonadMask m, MonadST m, MonadMVar m) - => TempRegistry m -> SnapLevels (Ref (Run m h)) -> m () -releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) + (MonadMask m, MonadST m) + => ActionRegistry m -> SnapLevels (Ref (Run m h)) -> m () +releaseRuns reg = traverse_ $ \r -> delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Opening from levels snapshot format -------------------------------------------------------------------------------} {-# SPECIALISE fromSnapLevels :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -316,7 +316,7 @@ releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) -- | Duplicates runs and re-creates merging runs. fromSnapLevels :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -347,11 +347,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do Merging mpfl <$> case smrs of SnapCompletedMerge run -> - allocateTemp reg (MR.newCompleted nr ne run) releaseRef + withRollback reg (MR.newCompleted nr ne run) releaseRef SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs) releaseRef -- When a snapshot is created, merge progress is lost, so we @@ -363,7 +363,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr return mr - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef {------------------------------------------------------------------------------- Hard links @@ -373,7 +373,7 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable deriving stock Eq {-# SPECIALISE hardLinkRunFiles :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> HardLinkDurable @@ -385,8 +385,8 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable -- name for the new directory entry. If @dur == HardLinkDurabl@, the links will -- also be made durable on disk. hardLinkRunFiles :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> HardLinkDurable @@ -400,7 +400,7 @@ hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do hardLinkTemp (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) where hardLinkTemp sourcePath targetPath = do - allocateTemp reg + withRollback reg (FS.createHardLink hbio sourcePath targetPath) (\_ -> FS.removeFile hfs targetPath) when (dur == HardLinkDurable) $ diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index 9612bd722..6d77a3a36 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -22,8 +22,8 @@ import Test.Tasty (TestTree, testGroup) import Test.Tasty.HUnit (assertEqual, testCase, (@=?), (@?)) import Test.Tasty.QuickCheck +import Control.ActionRegistry (withActionRegistry) import Control.RefCount -import Control.TempRegistry (withTempRegistry) import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) @@ -187,7 +187,7 @@ prop_WriteAndOpen :: -> IO Property prop_WriteAndOpen fs hbio wb = withRun fs hbio (simplePath 1337) (serialiseRunData wb) $ \written -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let paths = Run.runFsPaths written paths' = paths { runNumber = RunNumber 17} hardLinkRunFiles reg fs hbio NoHardLinkDurable paths paths' From 8c54ecc95c46a833664dee52aae90f1b9b993ca1 Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Tue, 24 Dec 2024 09:54:50 +0100 Subject: [PATCH 2/4] Remove `TempRegistry` now that it has become unused --- lsm-tree.cabal | 11 +- src-control/Control/TempRegistry.hs | 226 ---------------------------- 2 files changed, 4 insertions(+), 233 deletions(-) delete mode 100644 src-control/Control/TempRegistry.hs diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 4df43071c..be435c91b 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -846,16 +846,13 @@ library control Control.ActionRegistry Control.Concurrent.Class.MonadSTM.RWVar Control.RefCount - Control.TempRegistry build-depends: - , base >=4.14 && <4.22 - , containers ^>=0.6 || ^>=0.7 - , deepseq ^>=1.4 || ^>=1.5 - , io-classes ^>=1.6 || ^>=1.7 - , io-classes:strict-mvar + , base >=4.14 && <4.22 + , deepseq ^>=1.4 || ^>=1.5 + , io-classes ^>=1.6 || ^>=1.7 , io-classes:strict-stm - , primitive ^>=0.9 + , primitive ^>=0.9 test-suite control-test import: language, warnings diff --git a/src-control/Control/TempRegistry.hs b/src-control/Control/TempRegistry.hs deleted file mode 100644 index a70012eb0..000000000 --- a/src-control/Control/TempRegistry.hs +++ /dev/null @@ -1,226 +0,0 @@ --- TODO: we are starting to use the TempRegistry for more than just resource --- allocation/release, we are more generally using it for /actions that can be --- rolled back/ and /actions that are delayed/. Maybe we should reframe the use --- cases for the 'TempRegistry', and do some renaming: --- * Rename @'allocateTemp'*@ to @'withRollback'*@ --- * Rename @'freeTemp'@ to @'delayUntilEnd'@ -module Control.TempRegistry ( - TempRegistry - , withTempRegistry - , allocateTemp - , allocateMaybeTemp - , allocateEitherTemp - , freeTemp - , modifyWithTempRegistry - , modifyWithTempRegistry_ - ) where - -import Control.Concurrent.Class.MonadMVar.Strict -import Control.Monad -import Control.Monad.Class.MonadThrow -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import Data.Void - --- | A temporary registry for resources that are bound to end up in some final --- state, after which they /should/ be guaranteed to be released correctly. --- --- It is the responsibility of the user to guarantee that this final state is --- released correctly in the presence of async exceptions. --- --- NOTE: this is based on [the @ResourceRegistry@ module from @ouroboros-consensus@](https://github.com/IntersectMBO/ouroboros-consensus/blob/main/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/ResourceRegistry.hs). --- --- There are some differences between @WithTempRegistry@ from --- @ouroboros-consensus@ and our 'TempRegistry'. For one, 'TempRegistry' allows --- for the temporary /freeing/ of resources, which @WithTempRegistry@ does not. --- However, @WithTempRegistry@ can check whether newly allocated resources --- actually end up in the final state. --- --- TODO: make 'TempRegistry' more sophisticated. Ideas: --- --- * Use a similar approach (like in 'WithTempRegistry@) for checking that --- temporarily allocated resources end up in the final state, and that --- temporarily freed resources are removed from the final state. --- --- * Statically disallow using a resource after @freeTemp@, for example through --- data abstraction. --- --- TODO: could https://hackage.haskell.org/package/resourcet be a suitable --- abstraction instead of 'TempRegistry'? -newtype TempRegistry m = TempRegistry { - tempRegistryState :: StrictMVar m (TempRegistryState m) - } - -data TempRegistryState m = TempRegistryState { - tempAllocated :: !(Map ResourceId (Resource m)) - , tempFreed :: !(Map ResourceId (Resource m)) - , nextId :: !ResourceId - } - -newtype ResourceId = ResourceId Int - deriving stock (Eq, Ord) - deriving newtype (Num) - -newtype Resource m = Resource { - resourceRelease :: (m ()) - } - -{-# SPECIALISE withTempRegistry :: (TempRegistry IO -> IO a) -> IO a #-} -withTempRegistry :: - (MonadMVar m, MonadCatch m) - => (TempRegistry m -> m a) - -> m a -withTempRegistry k = fst <$> generalBracket acquire release k - where - acquire = unsafeNewTempRegistry - release reg ec = unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE unsafeNewTempRegistry :: IO (TempRegistry IO) #-} --- | This is considered unsafe, because one should properly 'bracket' this --- function. Example: --- --- @ --- generalBracket unsafeNewTempRegistry unsafeReleaseTempRegistry --- @ -unsafeNewTempRegistry :: MonadMVar m => m (TempRegistry m) -unsafeNewTempRegistry = TempRegistry <$> newMVar (TempRegistryState Map.empty Map.empty (ResourceId 0)) - -{-# SPECIALISE unsafeReleaseTempRegistry :: TempRegistry IO -> ExitCase a -> IO () #-} --- | See 'unsafeNewTempRegistry'. -unsafeReleaseTempRegistry :: MonadMVar m => TempRegistry m -> ExitCase a -> m () -unsafeReleaseTempRegistry reg ec = case ec of - ExitCaseSuccess{} -> mapM_ resourceRelease . tempFreed =<< takeMVar (tempRegistryState reg) - _ -> mapM_ resourceRelease . tempAllocated =<< takeMVar (tempRegistryState reg) - - -{-# SPECIALISE allocateTemp :: TempRegistry IO -> IO a -> (a -> IO ()) -> IO a #-} --- | Temporarily allocate a resource. --- --- This runs the @acquire@ function with async exceptions masked to ensure that --- acquired resources are always put into the registry. However, note that in --- general the following two expressions are not equivalent: --- --- @ --- allocateTemp reg acquire free --- acquire >>= \x -> allocateTemp reg free (pure x) --- @ --- --- Assuming that @acquire@ is not already exception safe, it is /not/ --- exception-safe to pass the result of @acquire@ to @allocateTemp@: an async --- exception could be thrown in between @acquire@ and @allocateTemp@, which --- leaks resources. -allocateTemp :: (MonadMask m, MonadMVar m) => - TempRegistry m - -> m a - -> (a -> m ()) - -> m a -allocateTemp reg acquire free = - mustBeRight <$!> allocateEitherTemp reg (fmap Right acquire) free - where - mustBeRight :: Either Void a -> a - mustBeRight (Left v) = absurd v - mustBeRight (Right a) = a - -{-# SPECIALISE allocateMaybeTemp :: TempRegistry IO -> IO (Maybe a) -> (a -> IO ()) -> IO (Maybe a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateMaybeTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Maybe a) - -> (a -> m ()) - -> m (Maybe a) -allocateMaybeTemp reg acquire free = - fromEither <$!> allocateEitherTemp reg (toEither <$> acquire) free - where - toEither :: Maybe a -> Either () a - toEither Nothing = Left () - toEither (Just x) = Right x - - fromEither :: Either () a -> Maybe a - fromEither (Left ()) = Nothing - fromEither (Right x) = Just $! x - -{-# SPECIALISE allocateEitherTemp :: TempRegistry IO -> IO (Either e a) -> (a -> IO ()) -> IO (Either e a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateEitherTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Either e a) - -> (a -> m ()) - -> m (Either e a) -allocateEitherTemp reg acquire free = - mask_ $ do - eith <- acquire - case eith of - Left e -> pure $ Left e - Right x -> do - modifyMVar_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = Map.insert rid (Resource (free x)) (tempAllocated st) - , tempFreed = tempFreed st - , nextId = rid' - } - pure $ Right x - -{-# SPECIALISE freeTemp :: TempRegistry IO -> IO () -> IO () #-} --- | Temporarily free a resource. --- --- NOTE: the resource is not actually released until the 'TempRegistry' is --- released. This makes rolling back simple, but it means that /use after free/ --- within the scope of a 'TempRegistry' will work just as if there had been no --- free at all. As such, though it is not recommended to rely on this --- peculiarity, the following is safe: --- --- @ --- allocateTemp reg free acquire >>= \x -> --- freeTemp reg (free x) >>= \_ -> {- do something with x -} --- @ -freeTemp :: MonadMVar m => TempRegistry m -> m () -> m () -freeTemp reg free = modifyMVarMasked_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = tempAllocated st - , tempFreed = Map.insert rid (Resource free) (tempFreed st) - , nextId = rid' - } - -{-# SPECIALISE modifyWithTempRegistry :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO (st, a)) -> IO a #-} --- | Exception-safe modification of state with a temporary registry. --- --- [Example:] When we modify a table's content (stored in a mutable variable), --- we might add new runs to the levels, or remove old runs from the level. If an --- exception is thrown before putting the updated table contents into the --- variable, then any resources that were acquired or released in the meantime --- should be rolled back. The 'TempRegistry' can be used to "temporarily" --- allocate or free resources, the effects of which are rolled back in case of --- an exception, or put into the final state when no exceptions were raised. -modifyWithTempRegistry :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m (st, a)) -- ^ Modify the state - -> m a -modifyWithTempRegistry getSt putSt action = - snd . fst <$> generalBracket acquire release (uncurry action) - where - acquire = (,) <$> unsafeNewTempRegistry <*> getSt - release (reg, oldSt) ec = do - case ec of - ExitCaseSuccess (newSt, _) -> putSt newSt - ExitCaseException _ -> putSt oldSt - ExitCaseAbort -> putSt oldSt - unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE modifyWithTempRegistry_ :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO st) -> IO () #-} --- | Like 'modifyWithTempRegistry', but without a return value. -modifyWithTempRegistry_ :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m st) - -> m () -modifyWithTempRegistry_ getSt putSt action = - modifyWithTempRegistry getSt putSt (\reg content -> (,()) <$> action reg content) From 640f48a86a03a31d9ecdb371ee11605c7a30a95a Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Tue, 24 Dec 2024 09:56:30 +0100 Subject: [PATCH 3/4] Action registry: run delayed actions in FIFO order instead of LIFO Say we have two delayed actions we want to register in the following order: close a file handle, and remove the corresponding file. When the action registry is committed, then previously we would run the actions in LIFO order. This leads to an error because we can't remove a file while we also have an open handle to that file. Now that we run the actions in FIFO order, it works correctly. --- src-control/Control/ActionRegistry.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src-control/Control/ActionRegistry.hs b/src-control/Control/ActionRegistry.hs index ab85bb9cc..c40f61f60 100644 --- a/src-control/Control/ActionRegistry.hs +++ b/src-control/Control/ActionRegistry.hs @@ -247,8 +247,8 @@ unsafeFinaliseActionRegistry reg ec = case ec of unsafeCommitActionRegistry :: (PrimMonad m, MonadCatch m) => ActionRegistry m -> m () unsafeCommitActionRegistry reg = do as <- readMutVar (registryDelay reg) - -- Run actions in LIFO order - r <- runActions as + -- Run actions in FIFO order + r <- runActions (reverse as) case NE.nonEmpty r of Nothing -> pure () Just exceptions -> throwIO (CommitActionRegistryError exceptions) From 6e6f72698662029a3c7663f270d7f1654c14a969 Mon Sep 17 00:00:00 2001 From: Joris Dral Date: Tue, 24 Dec 2024 10:19:06 +0100 Subject: [PATCH 4/4] Prevent `NoThunks` failures using strict(er) vector operations --- src/Database/LSMTree/Internal/MergeSchedule.hs | 10 +++++----- src/Database/LSMTree/Internal/Vector.hs | 6 ++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 537cccb4d..5957bb4f2 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -58,7 +58,7 @@ import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.Serialise (SerialisedBlob, SerialisedKey, SerialisedValue) import Database.LSMTree.Internal.UniqCounter -import Database.LSMTree.Internal.Vector (mapStrict) +import Database.LSMTree.Internal.Vector (forMStrict, mapStrict) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import qualified Database.LSMTree.Internal.WriteBuffer as WB import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) @@ -200,7 +200,7 @@ mkLevelsCache reg lvls = do -> Levels m h -> m a foldRunAndMergeM k1 k2 ls = - fmap fold $ V.forM ls $ \(Level ir rs) -> do + fmap fold $ forMStrict ls $ \(Level ir rs) -> do incoming <- case ir of Single r -> k1 r Merging _ mr -> k2 mr @@ -251,7 +251,7 @@ duplicateLevelsCache :: -> LevelsCache m h -> m (LevelsCache m h) duplicateLevelsCache reg cache = do - rs' <- V.forM (cachedRuns cache) $ \r -> + rs' <- forMStrict (cachedRuns cache) $ \r -> withRollback reg (dupRef r) releaseRef return cache { cachedRuns = rs' } @@ -300,9 +300,9 @@ duplicateLevels :: -> Levels m h -> m (Levels m h) duplicateLevels reg levels = - V.forM levels $ \Level {incomingRun, residentRuns} -> do + forMStrict levels $ \Level {incomingRun, residentRuns} -> do incomingRun' <- duplicateIncomingRun reg incomingRun - residentRuns' <- V.forM residentRuns $ \r -> + residentRuns' <- forMStrict residentRuns $ \r -> withRollback reg (dupRef r) releaseRef return $! Level { incomingRun = incomingRun', diff --git a/src/Database/LSMTree/Internal/Vector.hs b/src/Database/LSMTree/Internal/Vector.hs index 618eb7f7f..e59639c6d 100644 --- a/src/Database/LSMTree/Internal/Vector.hs +++ b/src/Database/LSMTree/Internal/Vector.hs @@ -9,6 +9,7 @@ module Database.LSMTree.Internal.Vector ( mapStrict, mapMStrict, imapMStrict, + forMStrict, zipWithStrict, binarySearchL, unsafeInsertWithMStrict, @@ -79,6 +80,11 @@ imapMStrict f v = V.imapM (\i -> f i >=> (pure $!)) v zipWithStrict :: forall a b c. (a -> b -> c) -> V.Vector a -> V.Vector b -> V.Vector c zipWithStrict f xs ys = runST (V.zipWithM (\x y -> pure $! f x y) xs ys) +-- | /( O(n) /) Like 'V.forM', but strict in the produced elements of type @b@. +{-# INLINE forMStrict #-} +forMStrict :: Monad m => V.Vector a -> (a -> m b) -> m (V.Vector b) +forMStrict xs f = V.forM xs (f >=> (pure $!)) + {-| Finds the lowest index in a given sorted vector at which the given element could be inserted while maintaining the sortedness.