Skip to content

Commit

Permalink
Merge pull request #713 from input-output-hk/PLT-7288
Browse files Browse the repository at this point in the history
PLT-7288 Added handling of redemptions in contract streaming.
  • Loading branch information
bwbush authored Sep 24, 2023
2 parents 5590faa + 3079e19 commit 39eecd8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Changed
-------

- Fixed `marlowe-apps` and `marlowe-finder` to handle new Marlowe chain sync protocol semantics where payout redemption is present in the event stream.
40 changes: 31 additions & 9 deletions marlowe-apps/src/Language/Marlowe/Runtime/App/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import Language.Marlowe.Runtime.App.Stream (
import Language.Marlowe.Runtime.App.Types
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader, ChainPoint, TxId)
import Language.Marlowe.Runtime.Core.Api (ContractId, MarloweVersionTag (V1))
import Language.Marlowe.Runtime.History.Api (ContractStep, CreateStep)
import Language.Marlowe.Runtime.History.Api (ContractStep (ApplyTransaction, RedeemPayout), CreateStep)
import Observe.Event.Backend (hoistEventBackend)
import Observe.Event.Dynamic (DynamicEventSelector (..), DynamicField)
import Observe.Event.Explicit (Event, EventBackend, addField, withEvent)
Expand Down Expand Up @@ -140,16 +140,21 @@ runDetection accept eventBackend config pollingFrequency finishOnClose finishOnW

data LastSeen = LastSeen
{ thisContractId :: ContractId
-- ^ The ID of the contract.
, theseSteps :: [ContractStep 'V1]
-- ^ The contract steps that were applied since the previous report.
, lastContract :: Contract
-- ^ The most recent contract body for this ID.
, lastTxId :: TxId
-- ^ The most recent transaction ID for this contract.
, ignoredTxIds :: S.Set TxId
-- ^ The set of transactions already reported for this contract.
}
deriving (Show)

newtype RequeueFrequency = RequeueFrequency Second

-- | Run a function for each open transaction of each contract, repeating periodically.
-- | Run a function for each open transaction of each contract, repeating periodically. Note that this does not visit every transaction in the contract: instead it only visits the transactions at the tip of the contract.
runContractAction
:: forall r
. Text
Expand All @@ -171,12 +176,19 @@ runContractAction selectorName eventBackend runInput (RequeueFrequency requeueFr
update :: Event IO r DynamicField -> ContractStream 'V1 -> M.Map ContractId LastSeen -> IO (M.Map ContractId LastSeen)
update event cs lastSeen =
let contractId = csContractId cs
in case (contractId `M.lookup` lastSeen, contractFromStream cs, transactionIdFromStream cs) of
(Nothing, Just contract, Just txId) -> pure $ M.insert contractId (LastSeen contractId mempty contract txId mempty) lastSeen
(Just seen, Just contract, Just txId) -> pure $ M.insert contractId (seen{lastContract = contract, lastTxId = txId}) lastSeen
(Just _, Nothing, Just _) -> pure $ M.delete contractId lastSeen
(seen, _, _) -> do
-- FIXME: Diagnose and remedy situations if this ever occurs.
in case (contractId `M.lookup` lastSeen, contractFromStream cs, transactionIdFromStream cs, cs) of
-- The contract is created, so record its ID, body, and most recent transaction in the map of most-recent information for contracts still open.
(Nothing, Just contract, Just txId, ContractStreamStart{}) -> pure $ M.insert contractId (LastSeen contractId mempty contract txId mempty) lastSeen
-- Input was applied to the contract, which is still open, so update its body and most recent transaction in the map of most-recent information for contracts still open.
(Just seen, Just contract, Just txId, ContractStreamContinued{csContractStep = ApplyTransaction{}}) -> pure $ M.insert contractId (seen{lastContract = contract, lastTxId = txId}) lastSeen
-- Input was applied to the contract, but it is now closed, so delete it from the map of most-recent information for contracts still open.
(Just _, Nothing, Just _, ContractStreamContinued{csContractStep = ApplyTransaction{}}) -> pure $ M.delete contractId lastSeen
-- A payout was redeemed from the contract, so there is no need to update the map of most-recent information for contracts still open.
(Just _, _, Just _, ContractStreamContinued{csContractStep = RedeemPayout{}}) -> pure lastSeen
-- A payout was redeemed from the contract after the contract closed, so there is no need to update the map of most-recent information for contracts still open.
(Nothing, _, Just _, ContractStreamContinued{csContractStep = RedeemPayout{}}) -> pure lastSeen
-- FIXME: This should be impossible because a contract must either be created, continuing, closing or redeeming, but diagnose and remedy if this ever occurs.
(seen, _, _, _) -> do
addField event $
("invalidContractStream" :: Text)
object
Expand All @@ -200,38 +212,48 @@ runContractAction selectorName eventBackend runInput (RequeueFrequency requeueFr
go :: M.Map ContractId LastSeen -> IO ()
go lastSeen =
do
-- The `lastSeen` map tracks the most-recent information about contracts that are still open.
lastSeen' <-
withEvent eventBackend (DynamicEventSelector selectorName) $
\event -> runExceptT $
do
cs <- ExceptT . atomically $ readTChan inChannel
liftIO . addField event $ ("contractId" :: Text) csContractId cs
liftIO $ case cs of
-- Add the contract to `lastSeen` when it is created.
ContractStreamStart{} -> do
addField event $ ("action" :: Text) ("start" :: String)
update event cs lastSeen
-- Update the contract information in `lastSeen` when it is continued by applying input or withdrawing a payout.
ContractStreamContinued{} -> do
addField event $ ("action" :: Text) ("continued" :: String)
update event cs lastSeen
-- Process a rollback, though nothing is required because the rolled-back aspects of the contract will naturally be replayed.
ContractStreamRolledBack{} -> do
addField event $ ("action" :: Text) ("rollback" :: String)
pure $ rollback cs lastSeen
-- The end of the stream for a contract has been reached, but there may be further progression of the contract when it is revisited in future followings.
ContractStreamWait{..} -> do
addField event $ ("action" :: Text) ("wait" :: String)
case csContractId `M.lookup` lastSeen of
-- We can only reach the tip of the contract if the contract was previously seen.
Just seen@LastSeen{lastTxId} ->
do
-- Supply the contract information to the user-defined processing function if we haven't done so already at this tip (transaction).
unless (lastTxId `S.member` ignoredTxIds seen) $
runInput event seen
-- Re-queue the contract ID so it is followed later, since there may then be new transactions beyond the present tip.
revisit csContractId
-- Remember to not call the user-defined processing function again a this tip (transaction).
pure $ ignore lastTxId csContractId lastSeen
-- FIXME: Diagnose and remedy situations if this ever occurs.
_ ->
do
-- FIXME: Diagnose and remedy situations if this ever occurs.
addField event $
("invalidContractStream" :: Text)
object ["contractStream" .= cs]
pure lastSeen
-- The stream of contract information is complete because the contract closed, so we don't need to track it anymore.
ContractStreamFinish{..} -> do
addField event $ ("action" :: Text) ("finish" :: String)
pure $ delete csContractId lastSeen
Expand Down
7 changes: 6 additions & 1 deletion marlowe-apps/src/Language/Marlowe/Runtime/App/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ import Language.Marlowe.Runtime.Core.Api (
assertVersionsEqual,
)
import Language.Marlowe.Runtime.Discovery.Api (ContractHeader (blockHeader, contractId))
import Language.Marlowe.Runtime.History.Api (ContractStep (ApplyTransaction), CreateStep (CreateStep, createOutput))
import Language.Marlowe.Runtime.History.Api (
ContractStep (ApplyTransaction, RedeemPayout),
CreateStep (CreateStep, createOutput),
RedeemStep (RedeemStep, redeemingTx),
)
import Observe.Event.Dynamic (DynamicEventSelector (..))
import Observe.Event.Explicit (EventBackend, addField, withEvent)
import Observe.Event.Syntax ((≔))
Expand Down Expand Up @@ -228,6 +232,7 @@ transactionIdFromStream
-> Maybe TxId
transactionIdFromStream ContractStreamStart{csCreateStep = CreateStep{createOutput = TransactionScriptOutput{utxo = TxOutRef{txId}}}} = pure txId
transactionIdFromStream ContractStreamContinued{csContractStep = (ApplyTransaction Transaction{transactionId})} = pure transactionId
transactionIdFromStream ContractStreamContinued{csContractStep = (RedeemPayout RedeemStep{redeemingTx})} = pure redeemingTx
transactionIdFromStream _ = Nothing

isContractStreamFinish
Expand Down

0 comments on commit 39eecd8

Please sign in to comment.