diff --git a/marlowe-apps/changelog.d/20230918_161023_brian.bush_PLT_7288.rst b/marlowe-apps/changelog.d/20230918_161023_brian.bush_PLT_7288.rst new file mode 100644 index 0000000000..54f9891b5c --- /dev/null +++ b/marlowe-apps/changelog.d/20230918_161023_brian.bush_PLT_7288.rst @@ -0,0 +1,4 @@ +Changed +------- + +- Fixed `marlowe-apps` and `marlowe-finder` to handle new Marlowe chain sync protocol semantics where payout redemption is present in the event stream. diff --git a/marlowe-apps/src/Language/Marlowe/Runtime/App/Channel.hs b/marlowe-apps/src/Language/Marlowe/Runtime/App/Channel.hs index 5d677076c8..9c6e537f91 100644 --- a/marlowe-apps/src/Language/Marlowe/Runtime/App/Channel.hs +++ b/marlowe-apps/src/Language/Marlowe/Runtime/App/Channel.hs @@ -41,7 +41,7 @@ import Language.Marlowe.Runtime.App.Stream ( import Language.Marlowe.Runtime.App.Types import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, TxId) import Language.Marlowe.Runtime.Core.Api (ContractId, MarloweVersionTag (V1)) -import Language.Marlowe.Runtime.History.Api (ContractStep, CreateStep) +import Language.Marlowe.Runtime.History.Api (ContractStep (ApplyTransaction, RedeemPayout), CreateStep) import Observe.Event.Backend (hoistEventBackend) import Observe.Event.Dynamic (DynamicEventSelector (..), DynamicField) import Observe.Event.Explicit (Event, EventBackend, addField, withEvent) @@ -140,16 +140,21 @@ runDetection accept eventBackend config pollingFrequency finishOnClose finishOnW data LastSeen = LastSeen { thisContractId :: ContractId + -- ^ The ID of the contract. , theseSteps :: [ContractStep 'V1] + -- ^ The contract steps that were applied since the previous report. , lastContract :: Contract + -- ^ The most recent contract body for this ID. , lastTxId :: TxId + -- ^ The most recent transaction ID for this contract. , ignoredTxIds :: S.Set TxId + -- ^ The set of transactions already reported for this contract. } deriving (Show) newtype RequeueFrequency = RequeueFrequency Second --- | Run a function for each open transaction of each contract, repeating periodically. +-- | Run a function for each open transaction of each contract, repeating periodically. Note that this does not visit every transaction in the contract: instead it only visits the transactions at the tip of the contract. runContractAction :: forall r . Text @@ -171,12 +176,19 @@ runContractAction selectorName eventBackend runInput (RequeueFrequency requeueFr update :: Event IO r DynamicField -> ContractStream 'V1 -> M.Map ContractId LastSeen -> IO (M.Map ContractId LastSeen) update event cs lastSeen = let contractId = csContractId cs - in case (contractId `M.lookup` lastSeen, contractFromStream cs, transactionIdFromStream cs) of - (Nothing, Just contract, Just txId) -> pure $ M.insert contractId (LastSeen contractId mempty contract txId mempty) lastSeen - (Just seen, Just contract, Just txId) -> pure $ M.insert contractId (seen{lastContract = contract, lastTxId = txId}) lastSeen - (Just _, Nothing, Just _) -> pure $ M.delete contractId lastSeen - (seen, _, _) -> do - -- FIXME: Diagnose and remedy situations if this ever occurs. + in case (contractId `M.lookup` lastSeen, contractFromStream cs, transactionIdFromStream cs, cs) of + -- The contract is created, so record its ID, body, and most recent transaction in the map of most-recent information for contracts still open. + (Nothing, Just contract, Just txId, ContractStreamStart{}) -> pure $ M.insert contractId (LastSeen contractId mempty contract txId mempty) lastSeen + -- Input was applied to the contract, which is still open, so update its body and most recent transaction in the map of most-recent information for contracts still open. + (Just seen, Just contract, Just txId, ContractStreamContinued{csContractStep = ApplyTransaction{}}) -> pure $ M.insert contractId (seen{lastContract = contract, lastTxId = txId}) lastSeen + -- Input was applied to the contract, but it is now closed, so delete it from the map of most-recent information for contracts still open. + (Just _, Nothing, Just _, ContractStreamContinued{csContractStep = ApplyTransaction{}}) -> pure $ M.delete contractId lastSeen + -- A payout was redeemed from the contract, so there is no need to update the map of most-recent information for contracts still open. + (Just _, _, Just _, ContractStreamContinued{csContractStep = RedeemPayout{}}) -> pure lastSeen + -- A payout was redeemed from the contract after the contract closed, so there is no need to update the map of most-recent information for contracts still open. + (Nothing, _, Just _, ContractStreamContinued{csContractStep = RedeemPayout{}}) -> pure lastSeen + -- FIXME: This should be impossible because a contract must either be created, continuing, closing or redeeming, but diagnose and remedy if this ever occurs. + (seen, _, _, _) -> do addField event $ ("invalidContractStream" :: Text) ≔ object @@ -200,6 +212,7 @@ runContractAction selectorName eventBackend runInput (RequeueFrequency requeueFr go :: M.Map ContractId LastSeen -> IO () go lastSeen = do + -- The `lastSeen` map tracks the most-recent information about contracts that are still open. lastSeen' <- withEvent eventBackend (DynamicEventSelector selectorName) $ \event -> runExceptT $ @@ -207,31 +220,40 @@ runContractAction selectorName eventBackend runInput (RequeueFrequency requeueFr cs <- ExceptT . atomically $ readTChan inChannel liftIO . addField event $ ("contractId" :: Text) ≔ csContractId cs liftIO $ case cs of + -- Add the contract to `lastSeen` when it is created. ContractStreamStart{} -> do addField event $ ("action" :: Text) ≔ ("start" :: String) update event cs lastSeen + -- Update the contract information in `lastSeen` when it is continued by applying input or withdrawing a payout. ContractStreamContinued{} -> do addField event $ ("action" :: Text) ≔ ("continued" :: String) update event cs lastSeen + -- Process a rollback, though nothing is required because the rolled-back aspects of the contract will naturally be replayed. ContractStreamRolledBack{} -> do addField event $ ("action" :: Text) ≔ ("rollback" :: String) pure $ rollback cs lastSeen + -- The end of the stream for a contract has been reached, but there may be further progression of the contract when it is revisited in future followings. ContractStreamWait{..} -> do addField event $ ("action" :: Text) ≔ ("wait" :: String) case csContractId `M.lookup` lastSeen of + -- We can only reach the tip of the contract if the contract was previously seen. Just seen@LastSeen{lastTxId} -> do + -- Supply the contract information to the user-defined processing function if we haven't done so already at this tip (transaction). unless (lastTxId `S.member` ignoredTxIds seen) $ runInput event seen + -- Re-queue the contract ID so it is followed later, since there may then be new transactions beyond the present tip. revisit csContractId + -- Remember to not call the user-defined processing function again a this tip (transaction). pure $ ignore lastTxId csContractId lastSeen + -- FIXME: Diagnose and remedy situations if this ever occurs. _ -> do - -- FIXME: Diagnose and remedy situations if this ever occurs. addField event $ ("invalidContractStream" :: Text) ≔ object ["contractStream" .= cs] pure lastSeen + -- The stream of contract information is complete because the contract closed, so we don't need to track it anymore. ContractStreamFinish{..} -> do addField event $ ("action" :: Text) ≔ ("finish" :: String) pure $ delete csContractId lastSeen diff --git a/marlowe-apps/src/Language/Marlowe/Runtime/App/Stream.hs b/marlowe-apps/src/Language/Marlowe/Runtime/App/Stream.hs index 13ecd2d157..8d6b578a2f 100644 --- a/marlowe-apps/src/Language/Marlowe/Runtime/App/Stream.hs +++ b/marlowe-apps/src/Language/Marlowe/Runtime/App/Stream.hs @@ -59,7 +59,11 @@ import Language.Marlowe.Runtime.Core.Api ( assertVersionsEqual, ) import Language.Marlowe.Runtime.Discovery.Api (ContractHeader (blockHeader, contractId)) -import Language.Marlowe.Runtime.History.Api (ContractStep (ApplyTransaction), CreateStep (CreateStep, createOutput)) +import Language.Marlowe.Runtime.History.Api ( + ContractStep (ApplyTransaction, RedeemPayout), + CreateStep (CreateStep, createOutput), + RedeemStep (RedeemStep, redeemingTx), + ) import Observe.Event.Dynamic (DynamicEventSelector (..)) import Observe.Event.Explicit (EventBackend, addField, withEvent) import Observe.Event.Syntax ((≔)) @@ -228,6 +232,7 @@ transactionIdFromStream -> Maybe TxId transactionIdFromStream ContractStreamStart{csCreateStep = CreateStep{createOutput = TransactionScriptOutput{utxo = TxOutRef{txId}}}} = pure txId transactionIdFromStream ContractStreamContinued{csContractStep = (ApplyTransaction Transaction{transactionId})} = pure transactionId +transactionIdFromStream ContractStreamContinued{csContractStep = (RedeemPayout RedeemStep{redeemingTx})} = pure redeemingTx transactionIdFromStream _ = Nothing isContractStreamFinish