Skip to content

Commit

Permalink
FIX: Clean up the advanced parameters in Astra DB Vector Store Compon…
Browse files Browse the repository at this point in the history
…ent (#5298)

* fix: Clean up the list of params in AstraDB

* Clean up some more parameters

* Update Vector Store RAG.json

* [autofix.ci] apply automated fixes

* Update Vector Store RAG.json

* [autofix.ci] apply automated fixes

* Update Vector Store RAG.json

* Update astradb.py

* [autofix.ci] apply automated fixes

* Update Vector Store RAG.json

* [autofix.ci] apply automated fixes

* Update Vector Store RAG.json

* [autofix.ci] apply automated fixes

* Error if no file provided

* Fix base file value to be empty

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
erichare and autofix-ci[bot] authored Dec 17, 2024
1 parent ba75a15 commit a3d238c
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 708 deletions.
1 change: 1 addition & 0 deletions src/backend/base/langflow/base/data/base_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __init__(self, *args, **kwargs):
fileTypes=[], # Dynamically set in __init__
info="", # Dynamically set in __init__
required=False,
value="",
),
HandleInput(
name="file_path",
Expand Down
4 changes: 2 additions & 2 deletions src/backend/base/langflow/components/data/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
return None

if not file_list:
self.log("No files to process.")
return file_list
msg = "No files to process."
raise ValueError(msg)

concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
file_count = len(file_list)
Expand Down
202 changes: 44 additions & 158 deletions src/backend/base/langflow/components/vectorstores/astradb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from collections import defaultdict

import orjson
from astrapy import DataAPIClient
from astrapy.admin import parse_api_endpoint
from langchain_astradb import AstraDBVectorStore
Expand Down Expand Up @@ -113,15 +112,33 @@ class AstraDBVectorStoreComponent(LCVectorStoreComponent):
info="Optional keyspace within Astra DB to use for the collection.",
advanced=True,
),
DropdownInput(
name="embedding_choice",
display_name="Embedding Model or Astra Vectorize",
info="Determines whether to use Astra Vectorize for the collection.",
options=["Embedding Model", "Astra Vectorize"],
real_time_refresh=True,
value="Embedding Model",
),
HandleInput(
name="embedding_model",
display_name="Embedding Model",
input_types=["Embeddings"],
info="Allows an embedding model configuration.",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
),
MultilineInput(
name="search_input",
display_name="Search Input",
display_name="Search Query",
tool_mode=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
display_name="Number of Search Results",
info="Number of search results to return.",
advanced=True,
value=4,
),
Expand All @@ -147,98 +164,6 @@ class AstraDBVectorStoreComponent(LCVectorStoreComponent):
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
),
DictInput(
name="search_filter",
display_name="[DEPRECATED] Search Metadata Filter",
info="Deprecated: use advanced_search_filter. Optional dictionary of filters to apply to the search query.",
advanced=True,
list=True,
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
),
DropdownInput(
name="embedding_choice",
display_name="Embedding Model or Astra Vectorize",
info="Determines whether to use Astra Vectorize for the collection.",
options=["Embedding Model", "Astra Vectorize"],
real_time_refresh=True,
value="Embedding Model",
),
HandleInput(
name="embedding_model",
display_name="Embedding Model",
input_types=["Embeddings"],
info="Allows an embedding model configuration.",
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
value="cosine",
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync' or 'Off'.",
options=["Sync", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
list=True,
advanced=True,
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
list=True,
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info='Optional JSON string for the "indexing" field of the collection. '
"See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
advanced=True,
),
StrInput(
name="content_field",
display_name="Content Field",
Expand All @@ -251,6 +176,12 @@ class AstraDBVectorStoreComponent(LCVectorStoreComponent):
info="Boolean flag to determine whether to ignore invalid documents at runtime.",
advanced=True,
),
NestedDictInput(
name="astradb_vectorstore_kwargs",
display_name="AstraDBVectorStore Parameters",
info="Optional dictionary of additional parameters for the AstraDBVectorStore.",
advanced=True,
),
]

def del_fields(self, build_config, field_list):
Expand Down Expand Up @@ -586,57 +517,21 @@ def build_vectorize_options(self, **kwargs):
def build_vector_store(self, vectorize_options=None):
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e

try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]

setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError as e:
msg = f"Invalid setup mode: {self.setup_mode}"
raise ValueError(msg) from e

# Initialize parameters based on the collection name
is_new_collection = self.collection_name == "+ Create new collection"

# Build the list of autodetect parameters
autodetect_params = {
"autodetect": not is_new_collection,
"metric_value": self.metric if is_new_collection else None,
"metadata_indexing_include": (
[s for s in self.metadata_indexing_include if s] or None if is_new_collection else None
),
"metadata_indexing_exclude": (
[s for s in self.metadata_indexing_exclude if s] or None if is_new_collection else None
),
"collection_indexing_policy": (
orjson.dumps(self.collection_indexing_policy)
if is_new_collection and self.collection_indexing_policy
else None
),
"setup_mode": setup_mode_value if is_new_collection else None,
}

# Unpack parameters
autodetect = autodetect_params["autodetect"]
metric_value = autodetect_params["metric_value"]
metadata_indexing_include = autodetect_params["metadata_indexing_include"]
metadata_indexing_exclude = autodetect_params["metadata_indexing_exclude"]
collection_indexing_policy = autodetect_params["collection_indexing_policy"]
setup_mode = autodetect_params["setup_mode"]

# Get the embedding model
embedding_dict = {"embedding": self.embedding_model} if self.embedding_choice == "Embedding Model" else {}
embedding_params = {"embedding": self.embedding_model} if self.embedding_choice == "Embedding Model" else {}

# Use the embedding model if the choice is set to "Embedding Model"
if self.embedding_choice == "Astra Vectorize" and not autodetect:
if self.embedding_choice == "Astra Vectorize" and is_new_collection:
from astrapy.info import CollectionVectorServiceOptions

# Build the vectorize options dictionary
Expand All @@ -650,7 +545,7 @@ def build_vector_store(self, vectorize_options=None):
)

# Set the embedding dictionary
embedding_dict = {
embedding_params = {
"collection_vector_service_options": CollectionVectorServiceOptions.from_dict(
dict_options.get("collection_vector_service_options")
),
Expand All @@ -670,29 +565,28 @@ def build_vector_store(self, vectorize_options=None):
if os.getenv("LANGFLOW_HOST") is not None:
langflow_prefix = "ds-"

# Bundle up the auto-detect parameters
autodetect_params = {
"autodetect_collection": not is_new_collection, # TODO: May want to expose this option
"content_field": self.content_field or None,
"ignore_invalid_documents": self.ignore_invalid_documents,
}

# Attempt to build the Vector Store object
try:
vector_store = AstraDBVectorStore(
# Astra DB Authentication Parameters
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.keyspace or None,
collection_name=self.get_collection_choice(),
autodetect_collection=autodetect,
content_field=self.content_field or None,
ignore_invalid_documents=self.ignore_invalid_documents,
environment=environment,
metric=metric_value,
batch_size=self.batch_size or None,
bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
bulk_delete_concurrency=self.bulk_delete_concurrency or None,
setup_mode=setup_mode,
pre_delete_collection=self.pre_delete_collection,
metadata_indexing_include=metadata_indexing_include,
metadata_indexing_exclude=metadata_indexing_exclude,
collection_indexing_policy=collection_indexing_policy,
# Astra DB Usage Tracking Parameters
ext_callers=[(f"{langflow_prefix}langflow", __version__)],
**embedding_dict,
# Astra DB Vector Store Parameters
**autodetect_params,
**embedding_params,
**self.astradb_vectorstore_kwargs,
)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
Expand Down Expand Up @@ -730,9 +624,6 @@ def _map_search_type(self) -> str:

def _build_search_args(self):
query = self.search_input if isinstance(self.search_input, str) and self.search_input.strip() else None
search_filter = (
{k: v for k, v in self.search_filter.items() if k and v and k.strip()} if self.search_filter else None
)

if query:
args = {
Expand All @@ -741,19 +632,14 @@ def _build_search_args(self):
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
elif self.advanced_search_filter or search_filter:
elif self.advanced_search_filter:
args = {
"n": self.number_of_results,
}
else:
return {}

filter_arg = self.advanced_search_filter or {}

if search_filter:
self.log(self.log(f"`search_filter` is deprecated. Use `advanced_search_filter`. Cleaned: {search_filter}"))
filter_arg.update(search_filter)

if filter_arg:
args["filter"] = filter_arg

Expand Down
Loading

0 comments on commit a3d238c

Please sign in to comment.