Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add loop component 🎁🎄 #5429

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/backend/base/langflow/components/logic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .data_conditional_router import DataConditionalRouterComponent
from .flow_tool import FlowToolComponent
from .listen import ListenComponent
from .loop import LoopComponent
from .notify import NotifyComponent
from .pass_message import PassMessageComponent
from .run_flow import RunFlowComponent
Expand All @@ -12,6 +13,7 @@
"DataConditionalRouterComponent",
"FlowToolComponent",
"ListenComponent",
"LoopComponent",
"NotifyComponent",
"PassMessageComponent",
"RunFlowComponent",
Expand Down
83 changes: 83 additions & 0 deletions src/backend/base/langflow/components/logic/loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data


class LoopComponent(Component):
display_name = "Loop"
description = (
"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs."
)
icon = "infinity"

inputs = [
DataInput(name="data", display_name="Data", info="The initial list of Data objects to iterate over."),
DataInput(name="loop", display_name="Loop Input", info="Data to aggregate during the iteration."),
]

outputs = [
Output(display_name="Item", name="item", method="item_output"),
Output(display_name="Done", name="done", method="done_output"),
]

def initialize_data(self):
"""Initialize the data list, context index, and aggregated list."""
if not self.ctx.get(f"{self._id}_initialized", False):
# Ensure data is a list of Data objects
if isinstance(self.data, Data):
data_list = [self.data]
elif isinstance(self.data, list):
data_list = self.data
else:
raise ValueError("The 'data' input must be a list of Data objects or a single Data object.")

Check failure on line 32 in src/backend/base/langflow/components/logic/loop.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (TRY003)

src/backend/base/langflow/components/logic/loop.py:32:23: TRY003 Avoid specifying long messages outside the exception class

Check failure on line 32 in src/backend/base/langflow/components/logic/loop.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (EM101)

src/backend/base/langflow/components/logic/loop.py:32:34: EM101 Exception must not use a string literal, assign to variable first

# Store the initial data and context variables
self.update_ctx(
{
f"{self._id}_data": data_list,
f"{self._id}_index": 0,
f"{self._id}_aggregated": [],
f"{self._id}_initialized": True,
}
)

def item_output(self) -> Data:
"""Output the next item in the list."""
self.initialize_data()

# Get data list and current index
data_list = self.ctx.get(f"{self._id}_data", [])
current_index = self.ctx.get(f"{self._id}_index", 0)

if current_index < len(data_list):
# Output current item
current_item = data_list[current_index]
self.update_ctx({f"{self._id}_index": current_index + 1})
print("item_output:", current_item)

Check failure on line 56 in src/backend/base/langflow/components/logic/loop.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (T201)

src/backend/base/langflow/components/logic/loop.py:56:13: T201 `print` found
return current_item
# No more items to output
self.stop("item")
return None

def done_output(self) -> Data:
"""Return the aggregated list once all items are processed."""
self.initialize_data()

# Get data list and aggregated list
data_list = self.ctx.get(f"{self._id}_data", [])
aggregated = self.ctx.get(f"{self._id}_aggregated", [])

# Check if loop input is provided
loop_input = self.loop
if loop_input:
# Append loop input to aggregated list
aggregated.append(loop_input)
self.update_ctx({f"{self._id}_aggregated": aggregated})

# Check if aggregation is complete
if len(aggregated) >= len(data_list):
print("done_output:", aggregated)

Check failure on line 79 in src/backend/base/langflow/components/logic/loop.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (T201)

src/backend/base/langflow/components/logic/loop.py:79:13: T201 `print` found
return [data for data in aggregated]

Check failure on line 80 in src/backend/base/langflow/components/logic/loop.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (C416)

src/backend/base/langflow/components/logic/loop.py:80:20: C416 Unnecessary list comprehension (rewrite using `list()`)
# Not all items have been processed yet
self.stop("done")
return None
Loading