Skip to content

Commit

Permalink
restore completed MergingRun snapshot through safe interface
Browse files Browse the repository at this point in the history
This was the last place that required unsafeNew. This commit also
removes MergeKnownCompleted from the snapshot format, which previously
allowed to represent inconsistent states. It can instead be
re-constructed from the MergingRunState.
  • Loading branch information
mheinzel committed Dec 12, 2024
1 parent 62c20c5 commit 0d42caa
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 54 deletions.
19 changes: 14 additions & 5 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
module Database.LSMTree.Internal.MergingRun (
MergingRun (..)
, new
, unsafeNew
, newCompleted
, duplicateRuns
, supplyCredits
, expectCompleted
Expand Down Expand Up @@ -122,10 +122,19 @@ new mergePolicy runs merge = do
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE unsafeNew :: MergePolicyForLevel -> NumRuns -> NumEntries -> MergeKnownCompleted -> MergingRunState IO h -> IO (Ref (MergingRun IO h)) #-}
-- | This allows constructing ill-formed MergingRuns, but the flexibility is
-- needed for creating a merging run that is already Completed, as well as
-- opening a merging run from a snapshot.
{-# SPECIALISE newCompleted :: MergePolicyForLevel -> NumRuns -> NumEntries -> Ref (Run IO h) -> IO (Ref (MergingRun IO h)) #-}
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
-> NumEntries
-> Ref (Run m h)
-> m (Ref (MergingRun m h))
newCompleted mergePolicy numInputRuns numInputEntries run = do
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
CompletedMerge run

{-# INLINE unsafeNew #-}
unsafeNew ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
Expand Down
71 changes: 31 additions & 40 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import Control.Monad.Primitive (PrimMonad)
import Control.RefCount
import Control.TempRegistry
import Data.Foldable (sequenceA_)
import Data.Primitive (readMutVar)
import Data.Primitive.PrimVar
import Data.Text (Text)
import Data.Traversable (for)
Expand Down Expand Up @@ -124,13 +123,13 @@ instance NFData r => NFData (SnapLevel r) where
rnf (SnapLevel a b) = rnf a `seq` rnf b

data SnapIncomingRun r =
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !MR.MergeKnownCompleted !(SnapMergingRunState r)
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !(SnapMergingRunState r)
| SnapSingleRun !r
deriving stock (Show, Eq, Functor, Foldable, Traversable)

instance NFData r => NFData (SnapIncomingRun r) where
rnf (SnapMergingRun a b c d e f) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f
rnf (SnapMergingRun a b c d e) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
rnf (SnapSingleRun a) = rnf a

-- | The total number of unspent credits. This total is used in combination with
Expand Down Expand Up @@ -187,15 +186,13 @@ toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
-- here, since we still start counting from 0 again when loading the snapshot.
toSnapIncomingRun (Merging (DeRef MR.MergingRun {..})) = do
unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits)
mergeCompletedCache <- readMutVar mergeKnownCompleted
smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs
pure $
SnapMergingRun
mergePolicy
mergeNumRuns
mergeNumEntries
(UnspentCredits unspentCredits)
mergeCompletedCache
smrs

{-# SPECIALISE toSnapMergingRunState ::
Expand Down Expand Up @@ -337,40 +334,34 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
-> m (IncomingRun m h)
fromSnapIncomingRun (SnapSingleRun run) =
pure (Single run)
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits knownCompleted smrs) = do
(spentCreditsMay, mrs) <- fromSnapMergingRunState smrs
mr <- MR.unsafeNew mpfl nr ne knownCompleted mrs

-- When a snapshot is created, merge progress is lost, so we have to
-- redo merging work here. UnspentCredits and SpentCredits track how
-- many credits were supplied before the snapshot was taken.
--
-- TODO: this use of supplyMergeCredits is leaky! If a merge completes
-- in supplyMergeCredits, then the resulting run is not tracked in the
-- registry, and closing the input runs is also not tracked in the
-- registry. Note, however, that this bit of code is likely to change in
-- #392.
let c = getUnspentCredits unspentCredits
+ maybe 0 getSpentCredits spentCreditsMay
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
return (Merging mr)

fromSnapMergingRunState ::
SnapMergingRunState (Ref (Run m h))
-> m (Maybe SpentCredits, MR.MergingRunState m h)
fromSnapMergingRunState (SnapCompletedMerge run) =
pure (Nothing, MR.CompletedMerge run)
fromSnapMergingRunState (SnapOngoingMerge runs spentCredits mergeLast) = do
-- Initialise the variable with 0. Credits will be re-supplied later,
-- which will ensure that this variable is updated.
spentCreditsVar <- MR.SpentCreditsVar <$> newPrimVar 0
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs)
Merge.abort
case mergeMaybe of
Nothing -> error "openLevels: merges can not be empty"
Just m -> pure (Just spentCredits, MR.OngoingMerge runs spentCreditsVar m)
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
Merging <$> case smrs of
SnapCompletedMerge run ->
MR.newCompleted mpfl nr ne run

SnapOngoingMerge runs spentCredits mergeLast -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs)
Merge.abort
mr <- case mergeMaybe of
Nothing -> error "openLevels: merges can not be empty"
Just m -> MR.new mpfl runs m

-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
-- SpentCredits track how many credits were supplied before the
-- snapshot was taken.
--
-- TODO: this use of supplyMergeCredits is leaky! If a merge
-- completes in supplyMergeCredits, then the resulting run is
-- not tracked in the registry, and closing the input runs is
-- also not tracked in the registry. Note, however, that this
-- bit of code is likely to change in #392.
let c = getUnspentCredits unspentCredits
+ getSpentCredits spentCredits
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
return mr

{-------------------------------------------------------------------------------
Hard links
Expand Down
9 changes: 4 additions & 5 deletions src/Database/LSMTree/Internal/Snapshot/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,13 @@ instance DecodeVersioned RunNumber where
-- SnapIncomingRun

instance Encode (SnapIncomingRun RunNumber) where
encode (SnapMergingRun mpfl nr ne uc mkc smrs) =
encodeListLen 7
encode (SnapMergingRun mpfl nr ne uc smrs) =
encodeListLen 6
<> encodeWord 0
<> encode mpfl
<> encode nr
<> encode ne
<> encode uc
<> encode mkc
<> encode smrs
encode (SnapSingleRun x) =
encodeListLen 2
Expand All @@ -474,9 +473,9 @@ instance DecodeVersioned (SnapIncomingRun RunNumber) where
n <- decodeListLen
tag <- decodeWord
case (n, tag) of
(7, 0) -> SnapMergingRun <$>
(6, 0) -> SnapMergingRun <$>
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v <*>
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v
decodeVersioned v <*> decodeVersioned v
(2, 1) -> SnapSingleRun <$> decodeVersioned v
_ -> fail ("[SnapMergingRun] Unexpected combination of list length and tag: " <> show (n, tag))

Expand Down
8 changes: 4 additions & 4 deletions test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ deriving newtype instance Arbitrary RunNumber
instance Arbitrary (SnapIncomingRun RunNumber) where
arbitrary = oneof [
SnapMergingRun <$> arbitrary <*> arbitrary <*> arbitrary
<*> arbitrary <*> arbitrary <*> arbitrary
<*> arbitrary <*> arbitrary
, SnapSingleRun <$> arbitrary
]
shrink (SnapMergingRun a b c d e f) =
[ SnapMergingRun a' b' c' d' e' f'
| (a', b', c', d', e', f') <- shrink (a, b, c, d, e, f) ]
shrink (SnapMergingRun a b c d e) =
[ SnapMergingRun a' b' c' d' e'
| (a', b', c', d', e') <- shrink (a, b, c, d, e) ]
shrink (SnapSingleRun a) = SnapSingleRun <$> shrink a

deriving newtype instance Arbitrary NumRuns
Expand Down

0 comments on commit 0d42caa

Please sign in to comment.