diff --git a/lsm-tree.cabal b/lsm-tree.cabal index 4df43071c..be435c91b 100644 --- a/lsm-tree.cabal +++ b/lsm-tree.cabal @@ -846,16 +846,13 @@ library control Control.ActionRegistry Control.Concurrent.Class.MonadSTM.RWVar Control.RefCount - Control.TempRegistry build-depends: - , base >=4.14 && <4.22 - , containers ^>=0.6 || ^>=0.7 - , deepseq ^>=1.4 || ^>=1.5 - , io-classes ^>=1.6 || ^>=1.7 - , io-classes:strict-mvar + , base >=4.14 && <4.22 + , deepseq ^>=1.4 || ^>=1.5 + , io-classes ^>=1.6 || ^>=1.7 , io-classes:strict-stm - , primitive ^>=0.9 + , primitive ^>=0.9 test-suite control-test import: language, warnings diff --git a/src-control/Control/ActionRegistry.hs b/src-control/Control/ActionRegistry.hs index 7819c2400..c40f61f60 100644 --- a/src-control/Control/ActionRegistry.hs +++ b/src-control/Control/ActionRegistry.hs @@ -34,8 +34,6 @@ import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NE import Data.Primitive.MutVar --- TODO: replace TempRegistry by ActionRegistry - -- TODO: add tests using fs-sim/io-sim to make sure exception safety is -- guaranteed. @@ -249,8 +247,8 @@ unsafeFinaliseActionRegistry reg ec = case ec of unsafeCommitActionRegistry :: (PrimMonad m, MonadCatch m) => ActionRegistry m -> m () unsafeCommitActionRegistry reg = do as <- readMutVar (registryDelay reg) - -- Run actions in LIFO order - r <- runActions as + -- Run actions in FIFO order + r <- runActions (reverse as) case NE.nonEmpty r of Nothing -> pure () Just exceptions -> throwIO (CommitActionRegistryError exceptions) diff --git a/src-control/Control/TempRegistry.hs b/src-control/Control/TempRegistry.hs deleted file mode 100644 index a70012eb0..000000000 --- a/src-control/Control/TempRegistry.hs +++ /dev/null @@ -1,226 +0,0 @@ --- TODO: we are starting to use the TempRegistry for more than just resource --- allocation/release, we are more generally using it for /actions that can be --- rolled back/ and /actions that are delayed/. Maybe we should reframe the use --- cases for the 'TempRegistry', and do some renaming: --- * Rename @'allocateTemp'*@ to @'withRollback'*@ --- * Rename @'freeTemp'@ to @'delayUntilEnd'@ -module Control.TempRegistry ( - TempRegistry - , withTempRegistry - , allocateTemp - , allocateMaybeTemp - , allocateEitherTemp - , freeTemp - , modifyWithTempRegistry - , modifyWithTempRegistry_ - ) where - -import Control.Concurrent.Class.MonadMVar.Strict -import Control.Monad -import Control.Monad.Class.MonadThrow -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import Data.Void - --- | A temporary registry for resources that are bound to end up in some final --- state, after which they /should/ be guaranteed to be released correctly. --- --- It is the responsibility of the user to guarantee that this final state is --- released correctly in the presence of async exceptions. --- --- NOTE: this is based on [the @ResourceRegistry@ module from @ouroboros-consensus@](https://github.com/IntersectMBO/ouroboros-consensus/blob/main/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/ResourceRegistry.hs). --- --- There are some differences between @WithTempRegistry@ from --- @ouroboros-consensus@ and our 'TempRegistry'. For one, 'TempRegistry' allows --- for the temporary /freeing/ of resources, which @WithTempRegistry@ does not. --- However, @WithTempRegistry@ can check whether newly allocated resources --- actually end up in the final state. --- --- TODO: make 'TempRegistry' more sophisticated. Ideas: --- --- * Use a similar approach (like in 'WithTempRegistry@) for checking that --- temporarily allocated resources end up in the final state, and that --- temporarily freed resources are removed from the final state. --- --- * Statically disallow using a resource after @freeTemp@, for example through --- data abstraction. --- --- TODO: could https://hackage.haskell.org/package/resourcet be a suitable --- abstraction instead of 'TempRegistry'? -newtype TempRegistry m = TempRegistry { - tempRegistryState :: StrictMVar m (TempRegistryState m) - } - -data TempRegistryState m = TempRegistryState { - tempAllocated :: !(Map ResourceId (Resource m)) - , tempFreed :: !(Map ResourceId (Resource m)) - , nextId :: !ResourceId - } - -newtype ResourceId = ResourceId Int - deriving stock (Eq, Ord) - deriving newtype (Num) - -newtype Resource m = Resource { - resourceRelease :: (m ()) - } - -{-# SPECIALISE withTempRegistry :: (TempRegistry IO -> IO a) -> IO a #-} -withTempRegistry :: - (MonadMVar m, MonadCatch m) - => (TempRegistry m -> m a) - -> m a -withTempRegistry k = fst <$> generalBracket acquire release k - where - acquire = unsafeNewTempRegistry - release reg ec = unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE unsafeNewTempRegistry :: IO (TempRegistry IO) #-} --- | This is considered unsafe, because one should properly 'bracket' this --- function. Example: --- --- @ --- generalBracket unsafeNewTempRegistry unsafeReleaseTempRegistry --- @ -unsafeNewTempRegistry :: MonadMVar m => m (TempRegistry m) -unsafeNewTempRegistry = TempRegistry <$> newMVar (TempRegistryState Map.empty Map.empty (ResourceId 0)) - -{-# SPECIALISE unsafeReleaseTempRegistry :: TempRegistry IO -> ExitCase a -> IO () #-} --- | See 'unsafeNewTempRegistry'. -unsafeReleaseTempRegistry :: MonadMVar m => TempRegistry m -> ExitCase a -> m () -unsafeReleaseTempRegistry reg ec = case ec of - ExitCaseSuccess{} -> mapM_ resourceRelease . tempFreed =<< takeMVar (tempRegistryState reg) - _ -> mapM_ resourceRelease . tempAllocated =<< takeMVar (tempRegistryState reg) - - -{-# SPECIALISE allocateTemp :: TempRegistry IO -> IO a -> (a -> IO ()) -> IO a #-} --- | Temporarily allocate a resource. --- --- This runs the @acquire@ function with async exceptions masked to ensure that --- acquired resources are always put into the registry. However, note that in --- general the following two expressions are not equivalent: --- --- @ --- allocateTemp reg acquire free --- acquire >>= \x -> allocateTemp reg free (pure x) --- @ --- --- Assuming that @acquire@ is not already exception safe, it is /not/ --- exception-safe to pass the result of @acquire@ to @allocateTemp@: an async --- exception could be thrown in between @acquire@ and @allocateTemp@, which --- leaks resources. -allocateTemp :: (MonadMask m, MonadMVar m) => - TempRegistry m - -> m a - -> (a -> m ()) - -> m a -allocateTemp reg acquire free = - mustBeRight <$!> allocateEitherTemp reg (fmap Right acquire) free - where - mustBeRight :: Either Void a -> a - mustBeRight (Left v) = absurd v - mustBeRight (Right a) = a - -{-# SPECIALISE allocateMaybeTemp :: TempRegistry IO -> IO (Maybe a) -> (a -> IO ()) -> IO (Maybe a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateMaybeTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Maybe a) - -> (a -> m ()) - -> m (Maybe a) -allocateMaybeTemp reg acquire free = - fromEither <$!> allocateEitherTemp reg (toEither <$> acquire) free - where - toEither :: Maybe a -> Either () a - toEither Nothing = Left () - toEither (Just x) = Right x - - fromEither :: Either () a -> Maybe a - fromEither (Left ()) = Nothing - fromEither (Right x) = Just $! x - -{-# SPECIALISE allocateEitherTemp :: TempRegistry IO -> IO (Either e a) -> (a -> IO ()) -> IO (Either e a) #-} --- | Like 'allocateTemp', but for resources that might fail to be acquired. -allocateEitherTemp :: - (MonadMask m, MonadMVar m) - => TempRegistry m - -> m (Either e a) - -> (a -> m ()) - -> m (Either e a) -allocateEitherTemp reg acquire free = - mask_ $ do - eith <- acquire - case eith of - Left e -> pure $ Left e - Right x -> do - modifyMVar_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = Map.insert rid (Resource (free x)) (tempAllocated st) - , tempFreed = tempFreed st - , nextId = rid' - } - pure $ Right x - -{-# SPECIALISE freeTemp :: TempRegistry IO -> IO () -> IO () #-} --- | Temporarily free a resource. --- --- NOTE: the resource is not actually released until the 'TempRegistry' is --- released. This makes rolling back simple, but it means that /use after free/ --- within the scope of a 'TempRegistry' will work just as if there had been no --- free at all. As such, though it is not recommended to rely on this --- peculiarity, the following is safe: --- --- @ --- allocateTemp reg free acquire >>= \x -> --- freeTemp reg (free x) >>= \_ -> {- do something with x -} --- @ -freeTemp :: MonadMVar m => TempRegistry m -> m () -> m () -freeTemp reg free = modifyMVarMasked_ (tempRegistryState reg) $ \st -> do - let rid = nextId st - rid' = rid + 1 - pure TempRegistryState { - tempAllocated = tempAllocated st - , tempFreed = Map.insert rid (Resource free) (tempFreed st) - , nextId = rid' - } - -{-# SPECIALISE modifyWithTempRegistry :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO (st, a)) -> IO a #-} --- | Exception-safe modification of state with a temporary registry. --- --- [Example:] When we modify a table's content (stored in a mutable variable), --- we might add new runs to the levels, or remove old runs from the level. If an --- exception is thrown before putting the updated table contents into the --- variable, then any resources that were acquired or released in the meantime --- should be rolled back. The 'TempRegistry' can be used to "temporarily" --- allocate or free resources, the effects of which are rolled back in case of --- an exception, or put into the final state when no exceptions were raised. -modifyWithTempRegistry :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m (st, a)) -- ^ Modify the state - -> m a -modifyWithTempRegistry getSt putSt action = - snd . fst <$> generalBracket acquire release (uncurry action) - where - acquire = (,) <$> unsafeNewTempRegistry <*> getSt - release (reg, oldSt) ec = do - case ec of - ExitCaseSuccess (newSt, _) -> putSt newSt - ExitCaseException _ -> putSt oldSt - ExitCaseAbort -> putSt oldSt - unsafeReleaseTempRegistry reg ec - -{-# SPECIALISE modifyWithTempRegistry_ :: IO st -> (st -> IO ()) -> (TempRegistry IO -> st -> IO st) -> IO () #-} --- | Like 'modifyWithTempRegistry', but without a return value. -modifyWithTempRegistry_ :: - (MonadMVar m, MonadCatch m) - => m st -- ^ Get the state - -> (st -> m ()) -- ^ Store a state - -> (TempRegistry m -> st -> m st) - -> m () -modifyWithTempRegistry_ getSt putSt action = - modifyWithTempRegistry getSt putSt (\reg content -> (,()) <$> action reg content) diff --git a/src/Database/LSMTree/Internal.hs b/src/Database/LSMTree/Internal.hs index 7d5dde759..c755e4027 100644 --- a/src/Database/LSMTree/Internal.hs +++ b/src/Database/LSMTree/Internal.hs @@ -83,7 +83,6 @@ import Control.Monad.Class.MonadST (MonadST (..)) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.Arena (ArenaManager, newArenaManager) import Data.Either (fromRight) @@ -702,12 +701,12 @@ new :: new sesh conf = do traceWith (sessionTracer sesh) TraceNewTable withOpenSession sesh $ \seshEnv -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do am <- newArenaManager blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> incrUniqCounter (sessionUniqCounter seshEnv) tableWriteBufferBlobs - <- allocateTemp reg + <- withRollback reg (WBB.new (sessionHasFS seshEnv) blobpath) releaseRef let tableWriteBuffer = WB.empty @@ -722,7 +721,7 @@ new sesh conf = do newWith reg sesh seshEnv conf am tc {-# SPECIALISE newWith :: - TempRegistry IO + ActionRegistry IO -> Session IO h -> SessionEnv IO h -> TableConfig @@ -730,8 +729,8 @@ new sesh conf = do -> TableContent IO h -> IO (Table IO h) #-} newWith :: - (MonadSTM m, MonadMVar m) - => TempRegistry m + (MonadSTM m, MonadMVar m, PrimMonad m) + => ActionRegistry m -> Session m h -> SessionEnv m h -> TableConfig @@ -754,8 +753,9 @@ newWith reg sesh seshEnv conf !am !tc = do let !tid = uniqueToWord64 tableId !t = Table conf tableVar am tr tid sesh -- Track the current table - freeTemp reg $ modifyMVar_ (sessionOpenTables seshEnv) - $ pure . Map.insert (uniqueToWord64 tableId) t + delayedCommit reg $ + modifyMVar_ (sessionOpenTables seshEnv) $ + pure . Map.insert (uniqueToWord64 tableId) t pure $! t {-# SPECIALISE close :: Table IO h -> IO () #-} @@ -766,7 +766,7 @@ close :: -> m () close t = do traceWith (tableTracer t) TraceCloseTable - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableState t)) (atomically . RW.unsafeReleaseWriteAccess (tableState t)) $ \reg -> \case TableClosed -> pure TableClosed @@ -774,7 +774,7 @@ close t = do -- Since we have a write lock on the table state, we know that we are the -- only thread currently closing the table. We can safely make the session -- forget about this table. - freeTemp reg (tableSessionUntrackTable (tableId t) thEnv) + delayedCommit reg (tableSessionUntrackTable (tableId t) thEnv) RW.withWriteAccess_ (tableContent thEnv) $ \tc -> do releaseTableContent reg tc pure tc @@ -868,7 +868,7 @@ updates resolve es t = do let conf = tableConfig t withOpenTable t $ \thEnv -> do let hfs = tableHasFS thEnv - modifyWithTempRegistry_ + modifyWithActionRegistry_ (RW.unsafeAcquireWriteAccess (tableContent thEnv)) (atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv)) $ \reg -> do updatesWithInterleavedFlushes @@ -1005,10 +1005,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession cursorSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do (wb, wbblobs, cursorRuns) <- dupTableContent reg (tableContent thEnv) cursorReaders <- - allocateMaybeTemp reg + withRollbackMaybe reg (Readers.new offsetKey (Just (wb, wbblobs)) cursorRuns) Readers.close let cursorWBB = wbblobs @@ -1017,9 +1017,9 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do -- Track cursor, but careful: If now an exception is raised, all -- resources get freed by the registry, so if the session still -- tracks 'cursor' (which is 'CursorOpen'), it later double frees. - -- Therefore, we only track the cursor if 'withTempRegistry' exits - -- successfully, i.e. using 'freeTemp'. - freeTemp reg $ + -- Therefore, we only track the cursor if 'withActionRegistry' exits + -- successfully, i.e. using 'delayedCommit'. + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.insert cursorId cursor pure $! cursor @@ -1030,10 +1030,10 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do RW.withReadAccess contentVar $ \content -> do let !wb = tableWriteBuffer content !wbblobs = tableWriteBufferBlobs content - wbblobs' <- allocateTemp reg (dupRef wbblobs) releaseRef + wbblobs' <- withRollback reg (dupRef wbblobs) releaseRef let runs = cachedRuns (tableCache content) runs' <- V.forM runs $ \r -> - allocateTemp reg (dupRef r) releaseRef + withRollback reg (dupRef r) releaseRef pure (wb, wbblobs', runs') {-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-} @@ -1044,20 +1044,20 @@ closeCursor :: -> m () closeCursor Cursor {..} = do traceWith cursorTracer $ TraceCloseCursor - modifyWithTempRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case + modifyWithActionRegistry_ (takeMVar cursorState) (putMVar cursorState) $ \reg -> \case CursorClosed -> return CursorClosed CursorOpen CursorEnv {..} -> do -- This should be safe-ish, but it's still not ideal, because it doesn't -- rule out sync exceptions in the cleanup operations. -- In that case, the cursor ends up closed, but resources might not have -- been freed. Probably better than the other way around, though. - freeTemp reg $ + delayedCommit reg $ modifyMVar_ (sessionOpenCursors cursorSessionEnv) $ pure . Map.delete cursorId - forM_ cursorReaders $ freeTemp reg . Readers.close - V.forM_ cursorRuns $ freeTemp reg . releaseRef - freeTemp reg (releaseRef cursorWBB) + forM_ cursorReaders $ delayedCommit reg . Readers.close + V.forM_ cursorRuns $ delayedCommit reg . releaseRef + delayedCommit reg (releaseRef cursorWBB) return CursorClosed {-# SPECIALISE readCursor :: @@ -1145,7 +1145,7 @@ createSnapshot resolve snap label tableType t = do traceWith (tableTracer t) $ TraceSnapshot snap let conf = tableConfig t withOpenTable t $ \thEnv -> - withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects + withActionRegistry $ \reg -> do -- TODO: use the temp registry for all side effects let hfs = tableHasFS thEnv hbio = tableHasBlockIO thEnv @@ -1158,13 +1158,13 @@ createSnapshot resolve snap label tableType t = do else -- we assume the snapshots directory already exists, so we just have -- to create the directory for this specific snapshot. - allocateTemp reg + withRollback_ reg (FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)) - (\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) + (FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir)) -- For the temporary implementation it is okay to just flush the buffer -- before taking the snapshot. - content <- modifyWithTempRegistry + content <- modifyWithActionRegistry (RW.unsafeAcquireWriteAccess (tableContent thEnv)) (atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv)) $ \innerReg content -> do @@ -1223,7 +1223,7 @@ openSnapshot :: openSnapshot sesh label tableType override snap resolve = do traceWith (sessionTracer sesh) $ TraceOpenSnapshot snap override withOpenSession sesh $ \seshEnv -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let hfs = sessionHasFS seshEnv hbio = sessionHasBlockIO seshEnv @@ -1250,7 +1250,7 @@ openSnapshot sesh label tableType override snap resolve = do am <- newArenaManager blobpath <- Paths.tableBlobPath (sessionRoot seshEnv) <$> incrUniqCounter (sessionUniqCounter seshEnv) - tableWriteBufferBlobs <- allocateTemp reg (WBB.new hfs blobpath) + tableWriteBufferBlobs <- withRollback reg (WBB.new hfs blobpath) releaseRef let actDir = Paths.activeDir (sessionRoot seshEnv) @@ -1331,7 +1331,7 @@ duplicate t@Table{..} = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. withOpenSession tableSession $ \_ -> do - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do -- The table contents escape the read access, but we just added references -- to each run so it is safe. content <- RW.withReadAccess tableContent (duplicateTableContent reg) @@ -1384,7 +1384,7 @@ unions ts = do -- We acquire a read-lock on the session open-state to prevent races, see -- 'sessionOpenTables'. - modifyWithTempRegistry + modifyWithActionRegistry (atomically $ RW.unsafeAcquireReadAccess (sessionState sesh)) (\_ -> atomically $ RW.unsafeReleaseReadAccess (sessionState sesh)) $ \reg -> \case SessionClosed -> throwIO ErrSessionClosed diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index 85cf9bd46..5957bb4f2 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -27,13 +27,13 @@ module Database.LSMTree.Internal.MergeSchedule ( , creditThresholdForLevel ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadSTM (MonadSTM (..)) import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..)) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Control.Tracer import Data.BloomFilter (Bloom) import Data.Foldable (fold) @@ -58,7 +58,7 @@ import Database.LSMTree.Internal.RunNumber import Database.LSMTree.Internal.Serialise (SerialisedBlob, SerialisedKey, SerialisedValue) import Database.LSMTree.Internal.UniqCounter -import Database.LSMTree.Internal.Vector (mapStrict) +import Database.LSMTree.Internal.Vector (forMStrict, mapStrict) import Database.LSMTree.Internal.WriteBuffer (WriteBuffer) import qualified Database.LSMTree.Internal.WriteBuffer as WB import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs) @@ -117,26 +117,26 @@ data TableContent m h = TableContent { , tableCache :: !(LevelsCache m h) } -{-# SPECIALISE duplicateTableContent :: TempRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} +{-# SPECIALISE duplicateTableContent :: ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} duplicateTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m (TableContent m h) duplicateTableContent reg (TableContent wb wbb levels cache) = do - wbb' <- allocateTemp reg (dupRef wbb) releaseRef + wbb' <- withRollback reg (dupRef wbb) releaseRef levels' <- duplicateLevels reg levels cache' <- duplicateLevelsCache reg cache return $! TableContent wb wbb' levels' cache' -{-# SPECIALISE releaseTableContent :: TempRegistry IO -> TableContent IO h -> IO () #-} +{-# SPECIALISE releaseTableContent :: ActionRegistry IO -> TableContent IO h -> IO () #-} releaseTableContent :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> TableContent m h -> m () releaseTableContent reg (TableContent _wb wbb levels cache) = do - freeTemp reg (releaseRef wbb) + delayedCommit reg (releaseRef wbb) releaseLevels reg levels releaseLevelsCache reg cache @@ -166,7 +166,7 @@ data LevelsCache m h = LevelsCache_ { } {-# SPECIALISE mkLevelsCache :: - TempRegistry IO + ActionRegistry IO -> Levels IO h -> IO (LevelsCache IO h) #-} -- | Flatten the argument 'Level's into a single vector of runs, including all @@ -174,13 +174,13 @@ data LevelsCache m h = LevelsCache_ { -- 'LevelsCache'. The cache will take a reference for each of its runs. mkLevelsCache :: forall m h. (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> Levels m h -> m (LevelsCache m h) mkLevelsCache reg lvls = do rs <- foldRunAndMergeM (fmap V.singleton . dupRun) - (\mr -> allocateTemp reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) + (\mr -> withRollback reg (MR.duplicateRuns mr) (V.mapM_ releaseRef)) lvls pure $! LevelsCache_ { cachedRuns = rs @@ -189,7 +189,7 @@ mkLevelsCache reg lvls = do , cachedKOpsFiles = mapStrict (\(DeRef r) -> Run.runKOpsFile r) rs } where - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef -- TODO: this is not terribly performant, but it is also not sure if we are -- going to need this in the end. We might get rid of the LevelsCache. @@ -200,14 +200,14 @@ mkLevelsCache reg lvls = do -> Levels m h -> m a foldRunAndMergeM k1 k2 ls = - fmap fold $ V.forM ls $ \(Level ir rs) -> do + fmap fold $ forMStrict ls $ \(Level ir rs) -> do incoming <- case ir of Single r -> k1 r Merging _ mr -> k2 mr (incoming <>) . fold <$> V.forM rs k1 {-# SPECIALISE rebuildCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> Levels IO h -> IO (LevelsCache IO h) #-} @@ -233,7 +233,7 @@ mkLevelsCache reg lvls = do -- Lookups should no invalidate blob erferences. rebuildCache :: (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + => ActionRegistry m -> LevelsCache m h -- ^ old cache -> Levels m h -- ^ new levels -> m (LevelsCache m h) -- ^ new cache @@ -242,31 +242,31 @@ rebuildCache reg oldCache newLevels = do mkLevelsCache reg newLevels {-# SPECIALISE duplicateLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO (LevelsCache IO h) #-} duplicateLevelsCache :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m (LevelsCache m h) duplicateLevelsCache reg cache = do - rs' <- V.forM (cachedRuns cache) $ \r -> - allocateTemp reg (dupRef r) releaseRef + rs' <- forMStrict (cachedRuns cache) $ \r -> + withRollback reg (dupRef r) releaseRef return cache { cachedRuns = rs' } {-# SPECIALISE releaseLevelsCache :: - TempRegistry IO + ActionRegistry IO -> LevelsCache IO h -> IO () #-} releaseLevelsCache :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> LevelsCache m h -> m () releaseLevelsCache reg cache = V.forM_ (cachedRuns cache) $ \r -> - freeTemp reg (releaseRef r) + delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Levels, runs and ongoing merges @@ -293,52 +293,52 @@ mergePolicyForLevel MergePolicyLazyLevelling (LevelNo n) nextLevels | V.null nextLevels = LevelLevelling -- levelling on last level | otherwise = LevelTiering -{-# SPECIALISE duplicateLevels :: TempRegistry IO -> Levels IO h -> IO (Levels IO h) #-} +{-# SPECIALISE duplicateLevels :: ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} duplicateLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m (Levels m h) duplicateLevels reg levels = - V.forM levels $ \Level {incomingRun, residentRuns} -> do + forMStrict levels $ \Level {incomingRun, residentRuns} -> do incomingRun' <- duplicateIncomingRun reg incomingRun - residentRuns' <- V.forM residentRuns $ \r -> - allocateTemp reg (dupRef r) releaseRef + residentRuns' <- forMStrict residentRuns $ \r -> + withRollback reg (dupRef r) releaseRef return $! Level { incomingRun = incomingRun', residentRuns = residentRuns' } -{-# SPECIALISE releaseLevels :: TempRegistry IO -> Levels IO h -> IO () #-} +{-# SPECIALISE releaseLevels :: ActionRegistry IO -> Levels IO h -> IO () #-} releaseLevels :: - (PrimMonad m, MonadMVar m, MonadMask m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> Levels m h -> m () releaseLevels reg levels = V.forM_ levels $ \Level {incomingRun, residentRuns} -> do releaseIncomingRun reg incomingRun - V.mapM_ (freeTemp reg . releaseRef) residentRuns + V.mapM_ (delayedCommit reg . releaseRef) residentRuns -{-# SPECIALISE duplicateIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} +{-# SPECIALISE duplicateIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO (IncomingRun IO h) #-} duplicateIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m (IncomingRun m h) duplicateIncomingRun reg (Single r) = - Single <$> allocateTemp reg (dupRef r) releaseRef + Single <$> withRollback reg (dupRef r) releaseRef duplicateIncomingRun reg (Merging mp mr) = - Merging mp <$> allocateTemp reg (dupRef mr) releaseRef + Merging mp <$> withRollback reg (dupRef mr) releaseRef -{-# SPECIALISE releaseIncomingRun :: TempRegistry IO -> IncomingRun IO h -> IO () #-} +{-# SPECIALISE releaseIncomingRun :: ActionRegistry IO -> IncomingRun IO h -> IO () #-} releaseIncomingRun :: - (PrimMonad m, MonadMask m, MonadMVar m) - => TempRegistry m + (PrimMonad m, MonadMask m) + => ActionRegistry m -> IncomingRun m h -> m () -releaseIncomingRun reg (Single r) = freeTemp reg (releaseRef r) -releaseIncomingRun reg (Merging _ mr) = freeTemp reg (releaseRef mr) +releaseIncomingRun reg (Single r) = delayedCommit reg (releaseRef r) +releaseIncomingRun reg (Merging _ mr) = delayedCommit reg (releaseRef mr) {-# SPECIALISE iforLevelM_ :: Levels IO h -> (LevelNo -> Level IO h -> IO ()) -> IO () #-} iforLevelM_ :: Monad m => Levels m h -> (LevelNo -> Level m h -> m ()) -> m () @@ -357,7 +357,7 @@ iforLevelM_ lvls k = V.iforM_ lvls $ \i lvl -> k (LevelNo (i + 1)) lvl -> SessionRoot -> UniqCounter IO -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | A single batch of updates can fill up the write buffer multiple times. We @@ -395,7 +395,7 @@ updatesWithInterleavedFlushes :: -> SessionRoot -> UniqCounter m -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) updatesWithInterleavedFlushes tr conf resolve hfs hbio root uc es reg tc = do @@ -474,7 +474,7 @@ addWriteBufferEntries hfs f wbblobs maxn = -> HasBlockIO IO h -> SessionRoot -> UniqCounter IO - -> TempRegistry IO + -> ActionRegistry IO -> TableContent IO h -> IO (TableContent IO h) #-} -- | Flush the write buffer to disk, regardless of whether it is full or not. @@ -490,7 +490,7 @@ flushWriteBuffer :: -> HasBlockIO m h -> SessionRoot -> UniqCounter m - -> TempRegistry m + -> ActionRegistry m -> TableContent m h -> m (TableContent m h) flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} @@ -504,7 +504,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} !alloc = bloomFilterAllocForLevel conf l !path = Paths.runPath root (uniqueToRunNumber n) traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc - r <- allocateTemp reg + r <- withRollback reg (Run.fromWriteBuffer hfs hbio cache alloc @@ -512,8 +512,8 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} (tableWriteBuffer tc) (tableWriteBufferBlobs tc)) releaseRef - freeTemp reg (releaseRef (tableWriteBufferBlobs tc)) - wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) + delayedCommit reg (releaseRef (tableWriteBufferBlobs tc)) + wbblobs' <- withRollback reg (WBB.new hfs (Paths.tableBlobPath root n)) releaseRef levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc) tableCache' <- rebuildCache reg (tableCache tc) levels' @@ -533,7 +533,7 @@ flushWriteBuffer tr conf@TableConfig{confDiskCachePolicy} -> SessionRoot -> UniqCounter IO -> Ref (Run IO h) - -> TempRegistry IO + -> ActionRegistry IO -> Levels IO h -> IO (Levels IO h) #-} -- | Add a run to the levels, and propagate merges. @@ -551,7 +551,7 @@ addRunToLevels :: -> SessionRoot -> UniqCounter m -> Ref (Run m h) - -> TempRegistry m + -> ActionRegistry m -> Levels m h -> m (Levels m h) addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = do @@ -620,8 +620,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = r <- case ir of Single r -> pure r Merging _ mr -> do - r <- allocateTemp reg (MR.expectCompleted mr) releaseRef - freeTemp reg (releaseRef mr) + r <- withRollback reg (MR.expectCompleted mr) releaseRef + delayedCommit reg (releaseRef mr) pure r traceWith tr $ AtLevel ln $ TraceExpectCompletedMerge (Run.runFsPathsNumber r) @@ -641,8 +641,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = (Run.runFsPathsNumber r) -- We create a fresh reference and release the original one. -- This will also make it easier to trace back where it was allocated. - ir <- Single <$> allocateTemp reg (dupRef r) releaseRef - freeTemp reg (releaseRef r) + ir <- Single <$> withRollback reg (dupRef r) releaseRef + delayedCommit reg (releaseRef r) pure ir | otherwise = do @@ -655,10 +655,10 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel -- The runs will end up inside the merging run, with fresh references. -- The original references can be released (but only on the happy path). - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc mergeLevel runPaths rs) releaseRef - V.forM_ rs $ \r -> freeTemp reg (releaseRef r) + V.forM_ rs $ \r -> delayedCommit reg (releaseRef r) case confMergeSchedule of Incremental -> pure () OneShot -> do diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index cf95aff51..6e4dfac45 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -23,6 +23,7 @@ module Database.LSMTree.Internal.MergingRun ( , MergeKnownCompleted (..) ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.DeepSeq (NFData (..)) import Control.Monad (void, when) @@ -32,7 +33,6 @@ import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError), MonadMask) import Control.Monad.Primitive import Control.RefCount -import Control.TempRegistry import Data.Maybe (fromMaybe) import Data.Primitive.MutVar import Data.Primitive.PrimVar @@ -140,8 +140,8 @@ new :: -> m (Ref (MergingRun m h)) new hfs hbio resolve caching alloc mergeLevel runPaths inputRuns = -- If creating the Merge fails, we must release the references again. - withTempRegistry $ \reg -> do - runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns + withActionRegistry $ \reg -> do + runs <- V.mapM (\r -> withRollback reg (dupRef r) releaseRef) inputRuns merge <- fromMaybe (error "newMerge: merges can not be empty") <$> Merge.new hfs hbio caching alloc mergeLevel resolve runPaths runs let numInputRuns = NumRuns $ V.length runs @@ -219,8 +219,8 @@ duplicateRuns (DeRef mr) = -- does not get completed concurrently before we are done. withMVar (mergeState mr) $ \case CompletedMerge r -> V.singleton <$> dupRef r - OngoingMerge rs _ _ -> withTempRegistry $ \reg -> - V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) rs + OngoingMerge rs _ _ -> withActionRegistry $ \reg -> + V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs {------------------------------------------------------------------------------- Credits diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index c6307318f..b94492f06 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -23,6 +23,7 @@ module Database.LSMTree.Internal.Snapshot ( , hardLinkRunFiles ) where +import Control.ActionRegistry import Control.Concurrent.Class.MonadMVar.Strict import Control.Concurrent.Class.MonadSTM (MonadSTM) import Control.DeepSeq (NFData (..)) @@ -31,7 +32,6 @@ import Control.Monad.Class.MonadST (MonadST) import Control.Monad.Class.MonadThrow (MonadMask) import Control.Monad.Primitive (PrimMonad) import Control.RefCount -import Control.TempRegistry import Data.Foldable (sequenceA_, traverse_) import Data.Primitive.PrimVar import Data.Text (Text) @@ -215,7 +215,7 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m -------------------------------------------------------------------------------} {-# SPECIALISE snapshotRuns :: - TempRegistry IO + ActionRegistry IO -> HasBlockIO IO h -> NamedSnapshotDir -> SnapLevels (Ref (Run IO h)) @@ -225,8 +225,8 @@ toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m -- the @targetDir@ directory. The hard links and the @targetDir@ are made -- durable on disk. snapshotRuns :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasBlockIO m h -> NamedSnapshotDir -> SnapLevels (Ref (Run m h)) @@ -245,7 +245,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do pure levels' {-# SPECIALISE openRuns :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -263,7 +263,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do -- The result must ultimately be released using 'releaseRuns'. openRuns :: (MonadMask m, MonadSTM m, MonadST m, MonadMVar m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -285,25 +285,25 @@ openRuns let targetPaths = RunFsPaths targetDir runNum' hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths - allocateTemp reg + withRollback reg (Run.openFromDisk hfs hbio caching targetPaths) releaseRef pure (SnapLevels levels') {-# SPECIALISE releaseRuns :: - TempRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () + ActionRegistry IO -> SnapLevels (Ref (Run IO h)) -> IO () #-} releaseRuns :: - (MonadMask m, MonadST m, MonadMVar m) - => TempRegistry m -> SnapLevels (Ref (Run m h)) -> m () -releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) + (MonadMask m, MonadST m) + => ActionRegistry m -> SnapLevels (Ref (Run m h)) -> m () +releaseRuns reg = traverse_ $ \r -> delayedCommit reg (releaseRef r) {------------------------------------------------------------------------------- Opening from levels snapshot format -------------------------------------------------------------------------------} {-# SPECIALISE fromSnapLevels :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> TableConfig @@ -316,7 +316,7 @@ releaseRuns reg = traverse_ $ \r -> freeTemp reg (releaseRef r) -- | Duplicates runs and re-creates merging runs. fromSnapLevels :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) - => TempRegistry m + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> TableConfig @@ -347,11 +347,11 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do Merging mpfl <$> case smrs of SnapCompletedMerge run -> - allocateTemp reg (MR.newCompleted nr ne run) releaseRef + withRollback reg (MR.newCompleted nr ne run) releaseRef SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc - mr <- allocateTemp reg + mr <- withRollback reg (MR.new hfs hbio resolve caching alloc lvl (mkPath rn) runs) releaseRef -- When a snapshot is created, merge progress is lost, so we @@ -363,7 +363,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr return mr - dupRun r = allocateTemp reg (dupRef r) releaseRef + dupRun r = withRollback reg (dupRef r) releaseRef {------------------------------------------------------------------------------- Hard links @@ -373,7 +373,7 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable deriving stock Eq {-# SPECIALISE hardLinkRunFiles :: - TempRegistry IO + ActionRegistry IO -> HasFS IO h -> HasBlockIO IO h -> HardLinkDurable @@ -385,8 +385,8 @@ data HardLinkDurable = HardLinkDurable | NoHardLinkDurable -- name for the new directory entry. If @dur == HardLinkDurabl@, the links will -- also be made durable on disk. hardLinkRunFiles :: - (MonadMask m, MonadMVar m) - => TempRegistry m + (MonadMask m, PrimMonad m) + => ActionRegistry m -> HasFS m h -> HasBlockIO m h -> HardLinkDurable @@ -400,7 +400,7 @@ hardLinkRunFiles reg hfs hbio dur sourceRunFsPaths targetRunFsPaths = do hardLinkTemp (runChecksumsPath sourceRunFsPaths) (runChecksumsPath targetRunFsPaths) where hardLinkTemp sourcePath targetPath = do - allocateTemp reg + withRollback reg (FS.createHardLink hbio sourcePath targetPath) (\_ -> FS.removeFile hfs targetPath) when (dur == HardLinkDurable) $ diff --git a/src/Database/LSMTree/Internal/Vector.hs b/src/Database/LSMTree/Internal/Vector.hs index 618eb7f7f..e59639c6d 100644 --- a/src/Database/LSMTree/Internal/Vector.hs +++ b/src/Database/LSMTree/Internal/Vector.hs @@ -9,6 +9,7 @@ module Database.LSMTree.Internal.Vector ( mapStrict, mapMStrict, imapMStrict, + forMStrict, zipWithStrict, binarySearchL, unsafeInsertWithMStrict, @@ -79,6 +80,11 @@ imapMStrict f v = V.imapM (\i -> f i >=> (pure $!)) v zipWithStrict :: forall a b c. (a -> b -> c) -> V.Vector a -> V.Vector b -> V.Vector c zipWithStrict f xs ys = runST (V.zipWithM (\x y -> pure $! f x y) xs ys) +-- | /( O(n) /) Like 'V.forM', but strict in the produced elements of type @b@. +{-# INLINE forMStrict #-} +forMStrict :: Monad m => V.Vector a -> (a -> m b) -> m (V.Vector b) +forMStrict xs f = V.forM xs (f >=> (pure $!)) + {-| Finds the lowest index in a given sorted vector at which the given element could be inserted while maintaining the sortedness. diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index 9612bd722..6d77a3a36 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -22,8 +22,8 @@ import Test.Tasty (TestTree, testGroup) import Test.Tasty.HUnit (assertEqual, testCase, (@=?), (@?)) import Test.Tasty.QuickCheck +import Control.ActionRegistry (withActionRegistry) import Control.RefCount -import Control.TempRegistry (withTempRegistry) import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..)) import Database.LSMTree.Extras.RunData import Database.LSMTree.Internal.BlobRef (BlobSpan (..)) @@ -187,7 +187,7 @@ prop_WriteAndOpen :: -> IO Property prop_WriteAndOpen fs hbio wb = withRun fs hbio (simplePath 1337) (serialiseRunData wb) $ \written -> - withTempRegistry $ \reg -> do + withActionRegistry $ \reg -> do let paths = Run.runFsPaths written paths' = paths { runNumber = RunNumber 17} hardLinkRunFiles reg fs hbio NoHardLinkDurable paths paths'