Skip to content

Commit

Permalink
restore completed MergingRun snapshot through safe interface
Browse files Browse the repository at this point in the history
This was the last place that required unsafeNew. This commit also
removes MergeKnownCompleted from the snapshot format, which previously
allowed to represent inconsistent states. It can instead be
re-constructed from the MergingRunState.
  • Loading branch information
mheinzel committed Dec 14, 2024
1 parent 5284b6f commit 9c87411
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 74 deletions.
21 changes: 16 additions & 5 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
module Database.LSMTree.Internal.MergingRun (
MergingRun (..)
, new
, unsafeNew
, newCompleted
, duplicateRuns
, supplyCredits
, expectCompleted
Expand Down Expand Up @@ -120,10 +120,21 @@ new mergePolicy runs merge = do
unsafeNew mergePolicy numInputRuns numInputEntries MergeMaybeCompleted $
OngoingMerge runs spentCreditsVar merge

{-# SPECIALISE unsafeNew :: MergePolicyForLevel -> NumRuns -> NumEntries -> MergeKnownCompleted -> MergingRunState IO h -> IO (Ref (MergingRun IO h)) #-}
-- | This allows constructing ill-formed MergingRuns, but the flexibility is
-- needed for creating a merging run that is already Completed, as well as
-- opening a merging run from a snapshot.
{-# SPECIALISE newCompleted :: MergePolicyForLevel -> NumRuns -> NumEntries -> Ref (Run IO h) -> IO (Ref (MergingRun IO h)) #-}
-- | Create a merging run that is already in the completed state, returning a
-- reference that must ultimately be released via 'releaseRef'.
newCompleted ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
-> NumRuns
-> NumEntries
-> Ref (Run m h)
-> m (Ref (MergingRun m h))
newCompleted mergePolicy numInputRuns numInputEntries run = do
unsafeNew mergePolicy numInputRuns numInputEntries MergeKnownCompleted $
CompletedMerge run

{-# INLINE unsafeNew #-}
unsafeNew ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> MergePolicyForLevel
Expand Down
70 changes: 30 additions & 40 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import Control.Monad.Primitive (PrimMonad)
import Control.RefCount
import Control.TempRegistry
import Data.Foldable (sequenceA_)
import Data.Primitive (readMutVar)
import Data.Primitive.PrimVar
import Data.Text (Text)
import Data.Traversable (for)
Expand Down Expand Up @@ -124,13 +123,13 @@ instance NFData r => NFData (SnapLevel r) where
rnf (SnapLevel a b) = rnf a `seq` rnf b

data SnapIncomingRun r =
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !MR.MergeKnownCompleted !(SnapMergingRunState r)
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !(SnapMergingRunState r)
| SnapSingleRun !r
deriving stock (Show, Eq, Functor, Foldable, Traversable)

instance NFData r => NFData (SnapIncomingRun r) where
rnf (SnapMergingRun a b c d e f) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f
rnf (SnapMergingRun a b c d e) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
rnf (SnapSingleRun a) = rnf a

-- | The total number of unspent credits. This total is used in combination with
Expand Down Expand Up @@ -187,15 +186,13 @@ toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
-- here, since we still start counting from 0 again when loading the snapshot.
toSnapIncomingRun (Merging (DeRef MR.MergingRun {..})) = do
unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits)
mergeCompletedCache <- readMutVar mergeKnownCompleted
smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs
pure $
SnapMergingRun
mergePolicy
mergeNumRuns
mergeNumEntries
(UnspentCredits unspentCredits)
mergeCompletedCache
smrs

{-# SPECIALISE toSnapMergingRunState ::
Expand Down Expand Up @@ -337,40 +334,33 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
-> m (IncomingRun m h)
fromSnapIncomingRun (SnapSingleRun run) =
pure (Single run)
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits knownCompleted smrs) = do
(spentCreditsMay, mrs) <- fromSnapMergingRunState smrs
mr <- MR.unsafeNew mpfl nr ne knownCompleted mrs

-- When a snapshot is created, merge progress is lost, so we have to
-- redo merging work here. UnspentCredits and SpentCredits track how
-- many credits were supplied before the snapshot was taken.
--
-- TODO: this use of supplyMergeCredits is leaky! If a merge completes
-- in supplyMergeCredits, then the resulting run is not tracked in the
-- registry, and closing the input runs is also not tracked in the
-- registry. Note, however, that this bit of code is likely to change in
-- #392.
let c = getUnspentCredits unspentCredits
+ maybe 0 getSpentCredits spentCreditsMay
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
return (Merging mr)

fromSnapMergingRunState ::
SnapMergingRunState (Ref (Run m h))
-> m (Maybe SpentCredits, MR.MergingRunState m h)
fromSnapMergingRunState (SnapCompletedMerge run) =
pure (Nothing, MR.CompletedMerge run)
fromSnapMergingRunState (SnapOngoingMerge runs spentCredits mergeLast) = do
-- Initialise the variable with 0. Credits will be re-supplied later,
-- which will ensure that this variable is updated.
spentCreditsVar <- MR.SpentCreditsVar <$> newPrimVar 0
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs)
Merge.abort
case mergeMaybe of
Nothing -> error "openLevels: merges can not be empty"
Just m -> pure (Just spentCredits, MR.OngoingMerge runs spentCreditsVar m)
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
Merging <$> case smrs of
SnapCompletedMerge run ->
MR.newCompleted mpfl nr ne run

SnapOngoingMerge runs spentCredits mergeLast -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mergeMaybe <- allocateMaybeTemp reg
(Merge.new hfs hbio caching alloc mergeLast resolve (mkPath rn) runs)
Merge.abort
mr <- case mergeMaybe of
Nothing -> error "openLevels: merges can not be empty"
Just m -> MR.new mpfl runs m

-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
-- SpentCredits track how many credits were supplied before the
-- snapshot was taken.
--
-- TODO: this use of supplyCredits is leaky! If a merge
-- completes in supplyCredits, then the resulting run is not
-- tracked in the registry, and closing the input runs is also
-- not tracked in the registry.
let c = getUnspentCredits unspentCredits
+ getSpentCredits spentCredits
MR.supplyCredits (MR.Credits c) (creditThresholdForLevel conf ln) mr
return mr

{-------------------------------------------------------------------------------
Hard links
Expand Down
24 changes: 4 additions & 20 deletions src/Database/LSMTree/Internal/Snapshot/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import Database.LSMTree.Internal.Entry
import qualified Database.LSMTree.Internal.Merge as Merge
import Database.LSMTree.Internal.MergeSchedule
import Database.LSMTree.Internal.MergingRun (NumRuns (..))
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.Run (ChecksumError (..),
FileFormatError (..))
import Database.LSMTree.Internal.RunNumber
Expand Down Expand Up @@ -455,14 +454,13 @@ instance DecodeVersioned RunNumber where
-- SnapIncomingRun

instance Encode (SnapIncomingRun RunNumber) where
encode (SnapMergingRun mpfl nr ne uc mkc smrs) =
encodeListLen 7
encode (SnapMergingRun mpfl nr ne uc smrs) =
encodeListLen 6
<> encodeWord 0
<> encode mpfl
<> encode nr
<> encode ne
<> encode uc
<> encode mkc
<> encode smrs
encode (SnapSingleRun x) =
encodeListLen 2
Expand All @@ -474,9 +472,9 @@ instance DecodeVersioned (SnapIncomingRun RunNumber) where
n <- decodeListLen
tag <- decodeWord
case (n, tag) of
(7, 0) -> SnapMergingRun <$>
(6, 0) -> SnapMergingRun <$>
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v <*>
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v
decodeVersioned v <*> decodeVersioned v
(2, 1) -> SnapSingleRun <$> decodeVersioned v
_ -> fail ("[SnapMergingRun] Unexpected combination of list length and tag: " <> show (n, tag))

Expand Down Expand Up @@ -510,20 +508,6 @@ instance Encode UnspentCredits where
instance DecodeVersioned UnspentCredits where
decodeVersioned V0 = UnspentCredits <$> decodeInt

-- MergeKnownCompleted

instance Encode MR.MergeKnownCompleted where
encode MR.MergeKnownCompleted = encodeWord 0
encode MR.MergeMaybeCompleted = encodeWord 1

instance DecodeVersioned MR.MergeKnownCompleted where
decodeVersioned V0 = do
tag <- decodeWord
case tag of
0 -> pure MR.MergeKnownCompleted
1 -> pure MR.MergeMaybeCompleted
_ -> fail ("[MergeKnownCompleted] Unexpected tag: " <> show tag)

-- SnapMergingRunState

instance Encode (SnapMergingRunState RunNumber) where
Expand Down
13 changes: 4 additions & 9 deletions test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ testAll test = [
, test (Proxy @NumRuns)
, test (Proxy @MergePolicyForLevel)
, test (Proxy @UnspentCredits)
, test (Proxy @MergeKnownCompleted)
, test (Proxy @(SnapMergingRunState RunNumber))
, test (Proxy @SpentCredits)
, test (Proxy @Merge.Level)
Expand Down Expand Up @@ -287,12 +286,12 @@ deriving newtype instance Arbitrary RunNumber
instance Arbitrary (SnapIncomingRun RunNumber) where
arbitrary = oneof [
SnapMergingRun <$> arbitrary <*> arbitrary <*> arbitrary
<*> arbitrary <*> arbitrary <*> arbitrary
<*> arbitrary <*> arbitrary
, SnapSingleRun <$> arbitrary
]
shrink (SnapMergingRun a b c d e f) =
[ SnapMergingRun a' b' c' d' e' f'
| (a', b', c', d', e', f') <- shrink (a, b, c, d, e, f) ]
shrink (SnapMergingRun a b c d e) =
[ SnapMergingRun a' b' c' d' e'
| (a', b', c', d', e') <- shrink (a, b, c, d, e) ]
shrink (SnapSingleRun a) = SnapSingleRun <$> shrink a

deriving newtype instance Arbitrary NumRuns
Expand All @@ -303,10 +302,6 @@ instance Arbitrary MergePolicyForLevel where

deriving newtype instance Arbitrary UnspentCredits

instance Arbitrary MergeKnownCompleted where
arbitrary = elements [MergeKnownCompleted, MergeMaybeCompleted]
shrink _ = []

instance Arbitrary (SnapMergingRunState RunNumber) where
arbitrary = oneof [
SnapCompletedMerge <$> arbitrary
Expand Down

0 comments on commit 9c87411

Please sign in to comment.