Skip to content

Commit

Permalink
Rewriting internals for better performance (#301)
Browse files Browse the repository at this point in the history
* Rewriting internals for better performance.

* Consolidate side-effects.

* Perform transition effects internally.

* Handle termination gracefully.

* Handle input changes while running.

* More documentation + improved naming.

* Rename transitionId to evaluationId.

* Add an effect test that happens after child is terminated.

* Remove onPendingTransition.

* Revert side-effect execution to be global.

* Document globalEvaluationId.

* Link to PR in changelog.

* More docs.

* Use onPostTransition.

* Fix typo.
  • Loading branch information
Laimiux authored Jul 7, 2023
1 parent 48dab2e commit 81ebe09
Show file tree
Hide file tree
Showing 20 changed files with 555 additions and 291 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- **Breaking**: ActivityStoreContext.selectActivityEvents and StreamConfigurator.update now explicitly requires a non-null type. This allows Formula to be compatible with Kotlin 1.8
- **Breaking**: Remove `Integration` and `FragmentContract`
- **Breaking**: Remove `Stream`, `StreamBuilder` and `StreamFormula`
- **Breaking**: Rewrite internals of formula action handling to enable inline re-evaluation. Lots of changes, can review them in https://github.com/instacart/formula/pull/301

## [0.7.1] - June 28, 2022
- **Breaking**: Rename `FragmentBindingBuilder` to `FragmentStoreBuilder`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.instacart.formula.test

import com.instacart.formula.DeferredAction
import com.instacart.formula.Inspector
import com.instacart.formula.Transition
import java.lang.AssertionError
import kotlin.reflect.KClass

Expand All @@ -26,14 +25,8 @@ class CountingInspector : Inspector {
actionsStarted.add(formulaType.java)
}

override fun onTransition(
formulaType: KClass<*>,
result: Transition.Result<*>,
evaluate: Boolean
) {
if (result is Transition.Result.Stateful) {
stateTransitions.add(formulaType.java)
}
override fun onStateChanged(formulaType: KClass<*>, old: Any?, new: Any?) {
stateTransitions.add(formulaType.java)
}

fun assertEvaluationCount(expected: Int) = apply {
Expand Down
194 changes: 111 additions & 83 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.instacart.formula.internal.FormulaManagerImpl
import com.instacart.formula.internal.ManagerDelegate
import com.instacart.formula.internal.ThreadChecker
import java.util.LinkedList
import kotlin.reflect.KClass

/**
* Takes a [Formula] and creates an Observable<Output> from it.
Expand All @@ -23,13 +22,33 @@ class FormulaRuntime<Input : Any, Output : Any>(
private var hasInitialFinished = false
private var emitOutput = false
private var lastOutput: Output? = null
private var executionRequested: Boolean = false

private val effectQueue = LinkedList<Effects>()

private var input: Input? = null
private var key: Any? = null
private var isExecuting: Boolean = false

/**
* Determines if we are executing within [runFormula] block. It prevents to
* enter [runFormula] block when we are already within it.
*/
private var isRunning: Boolean = false

/**
* When we are within the [run] block, inputId allows us to notice when input has changed
* and to re-run when that happens.
*/
private var inputId: Int = 0

/**
* Global transition effect queue which executes side-effects
* after all formulas are idle.
*/
private var globalEffectQueue = LinkedList<Effects>()

/**
* Determines if we are iterating through [globalEffectQueue]. It prevents us from
* entering executeTransitionEffects block when we are already within it.
*/
private var isExecutingEffects: Boolean = false

fun isKeyValid(input: Input): Boolean {
return this.input == null || key == formula.key(input)
Expand All @@ -42,82 +61,105 @@ class FormulaRuntime<Input : Any, Output : Any>(

if (initialization) {
manager = FormulaManagerImpl(this, implementation, input, loggingType = formula::class, inspector = inspector)
forceRun()
run()

hasInitialFinished = true
emitOutputIfNeeded(isInitialRun = true)
} else {
forceRun()
inputId += 1
run()
}
}

fun terminate() {
manager?.apply {
markAsTerminated()
performTerminationSideEffects()

/**
* The way termination side-effects are performed:
* - If we are not running, let's perform them here
* - If we are running, runFormula() will handle them
*
* This way, we let runFormula() exit out before we terminate everything.
*/
if (!isRunning) {
performTermination()
}
}
}

override fun onTransition(formulaType: KClass<*>, result: Transition.Result<*>, evaluate: Boolean) {
threadChecker.check("Only thread that created it can trigger transitions.")

inspector?.onTransition(formulaType, result, evaluate)
override fun onPostTransition(effects: Effects?, evaluate: Boolean) {
threadChecker.check("Only thread that created it can post transition result")

result.effects?.let {
effectQueue.addLast(it)
effects?.let {
globalEffectQueue.addLast(effects)
}

run(evaluate = evaluate)
if (effects != null || evaluate) {
run(evaluate = evaluate)
}
}

private fun forceRun() = run(evaluate = true)

/**
* Performs the evaluation and execution phases.
*
* @param evaluate Determines if evaluation needs to be run.
*/
private fun run(evaluate: Boolean) {
private fun run(evaluate: Boolean = true) {
if (isRunning) return

try {
runFormula(evaluate)
if (!isExecuting) {
val manager = checkNotNull(manager)

if (evaluate) {
var shouldRun = true
while (shouldRun) {
val localInputId = inputId
if (!manager.isTerminated()) {
isRunning = true
inspector?.onRunStarted(true)

val currentInput = checkNotNull(input)
runFormula(manager, currentInput)
isRunning = false

inspector?.onRunFinished()

/**
* If termination happened during runFormula() execution, let's perform
* termination side-effects here.
*/
if (manager.isTerminated()) {
shouldRun = false
performTermination()
} else {
shouldRun = localInputId != inputId
}
} else {
shouldRun = false
}
}
}

if (isExecutingEffects) return
executeTransitionEffects()

if (!manager.isTerminated()) {
emitOutputIfNeeded(isInitialRun = false)
}
} catch (e: Throwable) {
isRunning = false

manager?.markAsTerminated()
onError(e)
manager?.performTerminationSideEffects()
}
}

private fun runFormula(evaluate: Boolean) {
val freshRun = !isExecuting
if (freshRun) {
inspector?.onRunStarted(evaluate)
}

val manager = checkNotNull(manager)
val currentInput = checkNotNull(input)

if (evaluate && !manager.terminated) {
evaluationPhase(manager, currentInput)
}

executionRequested = true
if (isExecuting) return

executionPhase(manager)

if (freshRun) {
inspector?.onRunFinished()
performTermination()
}
}

/**
* Runs formula evaluation.
*/
private fun evaluationPhase(manager: FormulaManager<Input, Output>, currentInput: Input) {
val result = manager.evaluate(currentInput)
private fun runFormula(manager: FormulaManager<Input, Output>, currentInput: Input) {
val result = manager.run(currentInput)
lastOutput = result.output
emitOutput = true

Expand All @@ -128,51 +170,27 @@ class FormulaRuntime<Input : Any, Output : Any>(
// We run evaluation again in validation mode which ensures validates
// that inputs and outputs are stable and do not break equality across
// identical runs.
manager.evaluate(currentInput)
manager.run(currentInput)
} finally {
manager.setValidationRun(false)
}
}
}

/**
* Executes operations containing side-effects such as starting/terminating streams.
*/
private fun executionPhase(manager: FormulaManagerImpl<Input, *, Output>) {
isExecuting = true
while (executionRequested) {
executionRequested = false

val transitionId = manager.transitionID
if (!manager.terminated) {
if (manager.executeUpdates()) {
continue
}
}

// We execute pending side-effects even after termination
if (executeEffects(manager, transitionId)) {
continue
}
}
isExecuting = false
}

/**
* Executes effects from the [effectQueue].
* Iterates through and executes pending transition side-effects. It will keep going until the
* whole queue is empty. If any transition happens due to an executed effect:
* - If state change happens, [runFormula] will run before next effect is executed
* - New transition effects will be added to [globalEffectQueue] which will be picked up within
* this loop
*/
private fun executeEffects(manager: FormulaManagerImpl<*, *, *>, transitionId: Long): Boolean {
while (effectQueue.isNotEmpty()) {
val effects = effectQueue.pollFirst()
if (effects != null) {
effects.execute()

if (manager.hasTransitioned(transitionId)) {
return true
}
}
private fun executeTransitionEffects() {
isExecutingEffects = true
while (globalEffectQueue.isNotEmpty()) {
val effects = globalEffectQueue.pollFirst()
effects.execute()
}
return false
isExecutingEffects = false
}

/**
Expand All @@ -186,4 +204,14 @@ class FormulaRuntime<Input : Any, Output : Any>(
onOutput(checkNotNull(lastOutput))
}
}

/**
* Performs formula termination effects and executes transition effects if needed.
*/
private fun performTermination() {
manager?.performTerminationSideEffects()
if (!isExecutingEffects) {
executeTransitionEffects()
}
}
}
8 changes: 4 additions & 4 deletions formula/src/main/java/com/instacart/formula/Inspector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ interface Inspector {
fun onActionFinished(formulaType: KClass<*>, action: DeferredAction<*>) = Unit

/**
* Called when a transition happens
* Called when a state change happens
*
* @param formulaType Formula type used to filter for specific events.
* @param result Transition result used to check what kind of transition.
* @param evaluate Indicates if transition requires a new evaluation.
* @param old Previous state value
* @param new New state value
*/
fun onTransition(formulaType: KClass<*>, result: Transition.Result<*>, evaluate: Boolean) = Unit
fun onStateChanged(formulaType: KClass<*>, old: Any?, new: Any?) = Unit

/**
* Called when [FormulaRuntime.run] is called. This method in combination with [onRunFinished]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ internal class ActionManager(
private var removeListInvalidated: Boolean = false
private var scheduledForRemoval: MutableList<DeferredAction<*>>? = null

fun onNewFrame(new: Set<DeferredAction<*>>) {
/**
* After evaluation, we might have a new list of actions that we need
* to start and some old ones that will need to be terminates. This function
* prepares for that work which will be performed in [terminateOld] and [startNew].
*/
fun prepareForPostEvaluation(new: Set<DeferredAction<*>>) {
actions = new

startListInvalidated = true
Expand All @@ -35,7 +40,7 @@ internal class ActionManager(
/**
* Returns true if there was a transition while terminating streams.
*/
fun terminateOld(transitionID: Long): Boolean {
fun terminateOld(evaluationId: Long): Boolean {
prepareStoppedActionList()

if (scheduledForRemoval.isNullOrEmpty()) {
Expand All @@ -52,15 +57,19 @@ internal class ActionManager(
running?.remove(action)
finishAction(action)

if (manager.hasTransitioned(transitionID)) {
if (manager.isTerminated()) {
return false
}

if (!manager.canUpdatesContinue(evaluationId)) {
return true
}
}
}
return false
}

fun startNew(transitionID: Long): Boolean {
fun startNew(evaluationId: Long): Boolean {
prepareNewActionList()

val scheduled = scheduledToStart ?: return false
Expand All @@ -79,7 +88,11 @@ internal class ActionManager(
getOrInitRunningActions().add(action)
action.start()

if (manager.hasTransitioned(transitionID)) {
if (manager.isTerminated()) {
return false
}

if (!manager.canUpdatesContinue(evaluationId)) {
return true
}
}
Expand Down
Loading

0 comments on commit 81ebe09

Please sign in to comment.