Skip to content

Commit

Permalink
introduce a Sequence::Compiler that doesn't use the Intermediate stru…
Browse files Browse the repository at this point in the history
…ctures

but instantly compiles a Schema. currently, this is 1,3x faster.
  • Loading branch information
apotonick committed Sep 3, 2024
1 parent d212bb9 commit 74d5064
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 36 deletions.
150 changes: 116 additions & 34 deletions lib/trailblazer/activity/dsl/linear/sequence/compiler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,66 @@ module Compiler
module_function

# Default strategy to find out what's a stop event is to inspect the TaskRef's {data[:stop_event]}.
def find_termini(intermediate_wiring)
intermediate_wiring
.find_all { |task_ref, _| task_ref.data[:stop_event] }
.collect { |task_ref, _| [task_ref.id, task_ref.data.fetch(:semantic)] }
def find_termini(sequence)
sequence
.find_all { |_, _, _, data| data[:stop_event] }
.collect { |_, task, _, data| [task, data.fetch(:semantic)] }
.to_h
end

# The first task in the wiring is the default start task.
def find_start_task_id(intermediate_wiring) # FIXME: shouldn't we use sequence here? and Row#id?
intermediate_wiring.first.first.id
def find_start_task(sequence)
sequence[0][1]
end

def call(sequence, find_stops: method(:find_termini), find_start: method(:find_start_task_id))
_implementations, intermediate_wiring =
sequence.inject([[], []]) do |(implementations, intermediates), seq_row|
_magnetic_to, task, connections, data = seq_row
id = data[:id]
def call(sequence, find_stops: Compiler.method(:find_termini), find_start: method(:find_start_task))
nodes_attributes = []

# execute all {Search}s for one sequence row.
connections = find_connections(seq_row, connections, sequence)
wiring = sequence.collect do |seq_row|
_magnetic_to, task, connections, data = seq_row

# FIXME: {:extensions} should be initialized
implementations += [[id, Schema::Implementation::Task(task, connections.collect { |output, _| output }, data[:extensions] || [])]]
id = data[:id]

intermediates += [
[
Schema::Intermediate::TaskRef(id, data),
# Compute outputs.
connections.collect { |output, target_id| Schema::Intermediate::Out(output.semantic, target_id) }
]
]
# execute all {Search}s for one sequence row.
connections = find_connections(seq_row, connections, sequence)

[implementations, intermediates]
end
circuit_connections = connections.collect { |output, target_task| [output.signal, target_task] }.to_h

start_task_id = find_start.(intermediate_wiring)
terminus_to_semantic = find_stops.(intermediate_wiring)
# nodes_attributes:
outputs = connections.keys
nodes_attributes << [
id,
task,
{}, # TODO: allow adding data from implementation.
outputs
]

intermediate = Schema::Intermediate.new(intermediate_wiring.to_h, terminus_to_semantic, start_task_id)
implementation = _implementations.to_h
[
task,
circuit_connections
]
end.to_h

Schema::Intermediate::Compiler.(intermediate, implementation) # implemented in the generic {trailblazer-activity} gem.
end
termini = find_stops.(sequence) # {task => semantic}
start_task = find_start.(sequence)

circuit = Trailblazer::Activity::Circuit.new(
wiring,
termini.keys, # termini
start_task: start_task
)

# activity_outputs = [Activity::Output(steps[last_step_i], :success)]
activity_outputs = termini.collect { |terminus, semantic| Activity::Output(terminus, semantic) }

# private
config = Activity::Schema::Intermediate::Compiler::DEFAULT_CONFIG

return circuit,
activity_outputs,
Schema::Nodes(nodes_attributes),
config

Schema.new(circuit, outputs, nodes, config)
end

# Execute all search strategies for a row, retrieve outputs and
# their respective target IDs.
Expand All @@ -64,10 +79,77 @@ def find_connections(seq_row, searches, sequence)

[
output,
target_seq_row.id
target_seq_row[1]
]
end
end.to_h
end

# FIXME: remove me once the direct Schema compilation is running and benchmarked.
module WithIntermediate
module_function


# The first task in the wiring is the default start task.
def find_start_task_id(intermediate_wiring) # FIXME: shouldn't we use sequence here? and Row#id?
intermediate_wiring.first.first.id
end

def find_termini(intermediate_wiring)
intermediate_wiring
.find_all { |task_ref, _| task_ref.data[:stop_event] }
.collect { |task_ref, _| [task_ref.id, task_ref.data.fetch(:semantic)] }
.to_h
end

def call(sequence, find_stops: method(:find_termini), find_start: method(:find_start_task_id))
_implementations, intermediate_wiring =
sequence.inject([[], []]) do |(implementations, intermediates), seq_row|
_magnetic_to, task, connections, data = seq_row
id = data[:id]

# execute all {Search}s for one sequence row.
connections = find_connections(seq_row, connections, sequence)

# FIXME: {:extensions} should be initialized
implementations += [[id, Schema::Implementation::Task(task, connections.collect { |output, _| output }, data[:extensions] || [])]]

intermediates += [
[
Schema::Intermediate::TaskRef(id, data),
# Compute outputs.
connections.collect { |output, target_id| Schema::Intermediate::Out(output.semantic, target_id) }
]
]

[implementations, intermediates]
end

start_task_id = find_start.(intermediate_wiring)
terminus_to_semantic = find_stops.(intermediate_wiring)

intermediate = Schema::Intermediate.new(intermediate_wiring.to_h, terminus_to_semantic, start_task_id)
implementation = _implementations.to_h

Schema::Intermediate::Compiler.(intermediate, implementation) # implemented in the generic {trailblazer-activity} gem.
end

# private

# Execute all search strategies for a row, retrieve outputs and
# their respective target IDs.
def find_connections(seq_row, searches, sequence)
searches.collect do |search|
output, target_seq_row = search.(sequence, seq_row) # invoke the node's "connection search" strategy.

target_seq_row = sequence[-1] if target_seq_row.nil? # connect to an End if target unknown. # DISCUSS: make this configurable, maybe?

[
output,
target_seq_row.id
]
end
end
end # WithIntermediate
end # Compiler
end # Sequence
end
Expand Down
10 changes: 8 additions & 2 deletions lib/trailblazer/activity/dsl/linear/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ def terminus(*args)
raise $!, "#{self}:#{$!.message}"
end

# COMPILER_FIXME = Sequence::Compiler::WithIntermediate
@@Compiler___Fixme = Sequence::Compiler
def Compiler___Fixme=(value)
@@Compiler___Fixme = value
end

private def recompile_activity(sequence)
schema = Sequence::Compiler.(sequence)
# schema = Sequence::Compiler.(sequence)
schema = @@Compiler___Fixme.(sequence)
Activity.new(schema)
end

Expand Down Expand Up @@ -112,7 +119,6 @@ def invoke(*args, **kws)
TaskWrap.invoke(self, *args, **kws)
end
end # class << self
# FIXME: do we want class << self?!

module DSL
module_function
Expand Down

0 comments on commit 74d5064

Please sign in to comment.