From b28fcd731524a2d5ba3b4fa0f3f4da91b27d307f Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:14:14 +0000 Subject: [PATCH] [autofix.ci] apply automated fixes --- .../starter_projects/Vector Store RAG.json | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index b322606d1091..926d41720c60 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -3050,6 +3050,7 @@ "method": "search_documents", "name": "search_results", "required_inputs": [ + "api_endpoint", "collection_name", "database_name", "token" @@ -3081,6 +3082,27 @@ "type": "NestedDict", "value": {} }, + "api_endpoint": { + "_input_type": "SecretStrInput", + "advanced": true, + "display_name": "API Endpoint", + "dynamic": false, + "info": "The Astra DB API Endpoint to use. Overrides selection of database.", + "input_types": [ + "Message" + ], + "load_from_db": true, + "name": "api_endpoint", + "password": true, + "placeholder": "", + "real_time_refresh": true, + "refresh_button": true, + "required": true, + "show": true, + "title_case": false, + "type": "str", + "value": "" + }, "astradb_vectorstore_kwargs": { "_input_type": "NestedDictInput", "advanced": true, @@ -3114,7 +3136,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import os\n\nfrom astrapy import DataAPIClient\nfrom astrapy.admin import parse_api_endpoint\nfrom langchain_astradb import AstraDBVectorStore\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.helpers import docs_to_data\nfrom langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n HandleInput,\n IntInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema import Data\nfrom langflow.utils.version import get_version_info\n\n\nclass AstraDBVectorStoreComponent(LCVectorStoreComponent):\n display_name: str = \"Astra DB\"\n description: str = \"Ingest and search documents in Astra DB\"\n documentation: str = \"https://docs.datastax.com/en/langflow/astra-components.html\"\n name = \"AstraDB\"\n icon: str = \"AstraDB\"\n\n _cached_vector_store: AstraDBVectorStore | None = None\n\n class NewDatabaseInput(DictInput):\n title: str = \"Create New Database\"\n description: str = \"Create a new database in Astra DB.\"\n db_names: list[str] = []\n status: str = \"\"\n collection_count: int = 0\n record_count: int = 0\n\n class NewCollectionInput(DictInput):\n title: str = \"Create New Collection\"\n description: str = \"Create a new collection in Astra DB.\"\n status: str = \"\"\n dimensions: int = 0\n model: str = \"\"\n similarity_metrics: list[str] = []\n icon: str = \"Collection\"\n\n base_inputs = LCVectorStoreComponent.inputs\n if \"search_query\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n tool_mode=True,\n )\n )\n if \"ingest_data\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n DataInput(\n name=\"ingest_data\",\n display_name=\"Ingest Data\",\n )\n )\n\n inputs = [\n SecretStrInput(\n name=\"token\",\n display_name=\"Astra DB Application Token\",\n info=\"Authentication token for accessing Astra DB.\",\n value=\"ASTRA_DB_APPLICATION_TOKEN\",\n required=True,\n advanced=os.getenv(\"ASTRA_ENHANCED\", \"false\").lower() == \"true\",\n real_time_refresh=True,\n ),\n DropdownInput(\n name=\"database_name\",\n display_name=\"Database\",\n info=\"Select a database in Astra DB.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n has_dialog=True, # New\n dialog_input=[NewDatabaseInput(name=\"database_input\").__dict__],\n options=[],\n value=\"\",\n ),\n DropdownInput(\n name=\"collection_name\",\n display_name=\"Collection\",\n info=\"The name of the collection within Astra DB where the vectors will be stored.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n has_dialog=True,\n dialog_input=[NewCollectionInput(name=\"collection_input\").__dict__],\n options=[],\n value=\"\",\n ),\n StrInput(\n name=\"keyspace\",\n display_name=\"Keyspace\",\n info=\"Optional keyspace within Astra DB to use for the collection.\",\n advanced=True,\n ),\n HandleInput(\n name=\"embedding_model\",\n display_name=\"Embedding Model\",\n input_types=[\"Embeddings\"],\n info=\"Allows an embedding model configuration.\",\n ),\n *base_inputs,\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Search Results\",\n info=\"Number of search results to return.\",\n advanced=True,\n value=4,\n ),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n info=\"Search type to use\",\n options=[\"Similarity\", \"Similarity with score threshold\", \"MMR (Max Marginal Relevance)\"],\n value=\"Similarity\",\n advanced=True,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results. \"\n \"(when using 'Similarity with score threshold')\",\n value=0,\n advanced=True,\n ),\n NestedDictInput(\n name=\"advanced_search_filter\",\n display_name=\"Search Metadata Filter\",\n info=\"Optional dictionary of filters to apply to the search query.\",\n advanced=True,\n ),\n StrInput(\n name=\"content_field\",\n display_name=\"Content Field\",\n info=\"Field to use as the text content field for the vector store.\",\n advanced=True,\n ),\n BoolInput(\n name=\"ignore_invalid_documents\",\n display_name=\"Ignore Invalid Documents\",\n info=\"Boolean flag to determine whether to ignore invalid documents at runtime.\",\n advanced=True,\n ),\n NestedDictInput(\n name=\"astradb_vectorstore_kwargs\",\n display_name=\"AstraDBVectorStore Parameters\",\n info=\"Optional dictionary of additional parameters for the AstraDBVectorStore.\",\n advanced=True,\n ),\n ]\n\n def get_database_list(self):\n # Get the admin object\n client = DataAPIClient(token=self.token)\n admin_client = client.get_admin(token=self.token)\n db_list = list(admin_client.list_databases())\n\n # Generate the api endpoint for each database\n return {\n db.info.name: {\n \"api_endpoint\": f\"https://{db.info.id}-{db.info.region}.apps.astra.datastax.com\",\n \"collections\": len(list(client.get_database(db.info.id, token=self.token).list_collection_names())),\n \"records\": 0,\n }\n for db in db_list\n }\n\n def get_api_endpoint(self):\n # If the database is not set, get the first database in the list\n if not self.database_name:\n return None\n\n # Otherwise, get the URL from the database list\n return self.get_database_list().get(self.database_name)\n\n def get_database(self):\n try:\n client = DataAPIClient(token=self.token)\n\n return client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting database: {e}\")\n\n return None\n\n def _initialize_database_options(self):\n try:\n return [\n {\"name\": name, \"collections\": info[\"collections\"], \"records\": info[\"records\"]}\n for name, info in self.get_database_list().items()\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching databases: {e}\")\n\n return []\n\n def _initialize_collection_options(self):\n database = self.get_database()\n if database is None:\n return []\n\n try:\n collection_list = list(database.list_collections())\n\n return [\n {\n \"name\": col.name,\n \"records\": 0,\n \"provider\": col.options.vector.service.provider if col.options.vector else \"\",\n \"model\": col.options.vector.service.model_name if col.options.vector else \"\",\n }\n for col in collection_list\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching collections: {e}\")\n\n return []\n\n def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None): # noqa: ARG002\n # Refresh the collection name options\n build_config[\"database_name\"][\"options\"] = self._initialize_database_options()\n build_config[\"collection_name\"][\"options\"] = self._initialize_collection_options()\n\n return build_config\n\n @check_cached_vector_store\n def build_vector_store(self):\n try:\n from langchain_astradb import AstraDBVectorStore\n except ImportError as e:\n msg = (\n \"Could not import langchain Astra DB integration package. \"\n \"Please install it with `pip install langchain-astradb`.\"\n )\n raise ImportError(msg) from e\n\n # Get the embedding model\n embedding_params = {\"embedding\": self.embedding_model} if self.embedding_choice == \"Embedding Model\" else {}\n\n # Get the running environment for Langflow\n environment = (\n parse_api_endpoint(self.get_api_endpoint()).environment if self.get_api_endpoint() is not None else None\n )\n\n # Get Langflow version and platform information\n __version__ = get_version_info()[\"version\"]\n langflow_prefix = \"\"\n if os.getenv(\"LANGFLOW_HOST\") is not None:\n langflow_prefix = \"ds-\"\n\n # Bundle up the auto-detect parameters\n autodetect_params = {\n \"autodetect_collection\": True, # TODO: May want to expose this option\n \"content_field\": self.content_field or None,\n \"ignore_invalid_documents\": self.ignore_invalid_documents,\n }\n\n # Attempt to build the Vector Store object\n try:\n vector_store = AstraDBVectorStore(\n # Astra DB Authentication Parameters\n token=self.token,\n api_endpoint=self.get_api_endpoint(),\n namespace=self.keyspace or None,\n collection_name=self.collection_name,\n environment=environment,\n # Astra DB Usage Tracking Parameters\n ext_callers=[(f\"{langflow_prefix}langflow\", __version__)],\n # Astra DB Vector Store Parameters\n **autodetect_params,\n **embedding_params,\n **self.astradb_vectorstore_kwargs,\n )\n except Exception as e:\n msg = f\"Error initializing AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self._add_documents_to_vector_store(vector_store)\n\n return vector_store\n\n def _add_documents_to_vector_store(self, vector_store) -> None:\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n documents.append(_input.to_lc_document())\n else:\n msg = \"Vector Store Inputs must be Data objects.\"\n raise TypeError(msg)\n\n if documents:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(documents)\n except Exception as e:\n msg = f\"Error adding documents to AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def _map_search_type(self) -> str:\n if self.search_type == \"Similarity with score threshold\":\n return \"similarity_score_threshold\"\n if self.search_type == \"MMR (Max Marginal Relevance)\":\n return \"mmr\"\n return \"similarity\"\n\n def _build_search_args(self):\n query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None\n\n if query:\n args = {\n \"query\": query,\n \"search_type\": self._map_search_type(),\n \"k\": self.number_of_results,\n \"score_threshold\": self.search_score_threshold,\n }\n elif self.advanced_search_filter:\n args = {\n \"n\": self.number_of_results,\n }\n else:\n return {}\n\n filter_arg = self.advanced_search_filter or {}\n if filter_arg:\n args[\"filter\"] = filter_arg\n\n return args\n\n def search_documents(self, vector_store=None) -> list[Data]:\n vector_store = vector_store or self.build_vector_store()\n\n self.log(f\"Search input: {self.search_query}\")\n self.log(f\"Search type: {self.search_type}\")\n self.log(f\"Number of results: {self.number_of_results}\")\n\n try:\n search_args = self._build_search_args()\n except Exception as e:\n msg = f\"Error in AstraDBVectorStore._build_search_args: {e}\"\n raise ValueError(msg) from e\n\n if not search_args:\n self.log(\"No search input or filters provided. Skipping search.\")\n return []\n\n docs = []\n search_method = \"search\" if \"query\" in search_args else \"metadata_search\"\n\n try:\n self.log(f\"Calling vector_store.{search_method} with args: {search_args}\")\n docs = getattr(vector_store, search_method)(**search_args)\n except Exception as e:\n msg = f\"Error performing {search_method} in AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self.log(f\"Retrieved documents: {len(docs)}\")\n\n data = docs_to_data(docs)\n self.log(f\"Converted documents to data: {len(data)}\")\n self.status = data\n return data\n\n def get_retriever_kwargs(self):\n search_args = self._build_search_args()\n return {\n \"search_type\": self._map_search_type(),\n \"search_kwargs\": search_args,\n }\n" + "value": "import os\n\nfrom astrapy import DataAPIClient\nfrom astrapy.admin import parse_api_endpoint\nfrom langchain_astradb import AstraDBVectorStore\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.helpers import docs_to_data\nfrom langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n HandleInput,\n IntInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema import Data\nfrom langflow.utils.version import get_version_info\n\n\nclass AstraDBVectorStoreComponent(LCVectorStoreComponent):\n display_name: str = \"Astra DB\"\n description: str = \"Ingest and search documents in Astra DB\"\n documentation: str = \"https://docs.datastax.com/en/langflow/astra-components.html\"\n name = \"AstraDB\"\n icon: str = \"AstraDB\"\n\n _cached_vector_store: AstraDBVectorStore | None = None\n\n class NewDatabaseInput(DictInput):\n title: str = \"Create New Database\"\n description: str = \"Create a new database in Astra DB.\"\n db_names: list[str] = []\n status: str = \"\"\n collection_count: int = 0\n record_count: int = 0\n\n class NewCollectionInput(DictInput):\n title: str = \"Create New Collection\"\n description: str = \"Create a new collection in Astra DB.\"\n status: str = \"\"\n dimensions: int = 0\n model: str = \"\"\n similarity_metrics: list[str] = []\n icon: str = \"Collection\"\n\n base_inputs = LCVectorStoreComponent.inputs\n if \"search_query\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n tool_mode=True,\n )\n )\n if \"ingest_data\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n DataInput(\n name=\"ingest_data\",\n display_name=\"Ingest Data\",\n )\n )\n\n inputs = [\n SecretStrInput(\n name=\"token\",\n display_name=\"Astra DB Application Token\",\n info=\"Authentication token for accessing Astra DB.\",\n value=\"ASTRA_DB_APPLICATION_TOKEN\",\n required=True,\n advanced=os.getenv(\"ASTRA_ENHANCED\", \"false\").lower() == \"true\",\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_endpoint\",\n display_name=\"API Endpoint\",\n info=\"The Astra DB API Endpoint to use. Overrides selection of database.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"database_name\",\n display_name=\"Database\",\n info=\"Select a database in Astra DB.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n dialog_inputs=[NewDatabaseInput(name=\"database_input\").__dict__],\n options=[],\n value=\"\",\n ),\n DropdownInput(\n name=\"collection_name\",\n display_name=\"Collection\",\n info=\"The name of the collection within Astra DB where the vectors will be stored.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n dialog_inputs=[NewCollectionInput(name=\"collection_input\").__dict__],\n options=[],\n value=\"\",\n ),\n StrInput(\n name=\"keyspace\",\n display_name=\"Keyspace\",\n info=\"Optional keyspace within Astra DB to use for the collection.\",\n advanced=True,\n ),\n HandleInput(\n name=\"embedding_model\",\n display_name=\"Embedding Model\",\n input_types=[\"Embeddings\"],\n info=\"Allows an embedding model configuration.\",\n ),\n *base_inputs,\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Search Results\",\n info=\"Number of search results to return.\",\n advanced=True,\n value=4,\n ),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n info=\"Search type to use\",\n options=[\"Similarity\", \"Similarity with score threshold\", \"MMR (Max Marginal Relevance)\"],\n value=\"Similarity\",\n advanced=True,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results. \"\n \"(when using 'Similarity with score threshold')\",\n value=0,\n advanced=True,\n ),\n NestedDictInput(\n name=\"advanced_search_filter\",\n display_name=\"Search Metadata Filter\",\n info=\"Optional dictionary of filters to apply to the search query.\",\n advanced=True,\n ),\n StrInput(\n name=\"content_field\",\n display_name=\"Content Field\",\n info=\"Field to use as the text content field for the vector store.\",\n advanced=True,\n ),\n BoolInput(\n name=\"ignore_invalid_documents\",\n display_name=\"Ignore Invalid Documents\",\n info=\"Boolean flag to determine whether to ignore invalid documents at runtime.\",\n advanced=True,\n ),\n NestedDictInput(\n name=\"astradb_vectorstore_kwargs\",\n display_name=\"AstraDBVectorStore Parameters\",\n info=\"Optional dictionary of additional parameters for the AstraDBVectorStore.\",\n advanced=True,\n ),\n ]\n\n def get_database_list(self):\n # Get the admin object\n client = DataAPIClient(token=self.token)\n admin_client = client.get_admin(token=self.token)\n db_list = list(admin_client.list_databases())\n\n # Generate the api endpoint for each database\n return {\n db.info.name: {\n \"api_endpoint\": f\"https://{db.info.id}-{db.info.region}.apps.astra.datastax.com\",\n \"collections\": len(list(client.get_database(db.info.id, token=self.token).list_collection_names())),\n \"records\": 0,\n }\n for db in db_list\n }\n\n def get_api_endpoint(self):\n # If the API endpoint is set, return it\n if self.api_endpoint:\n return self.api_endpoint\n\n # If the database is not set, nothing we can do.\n if not self.database_name:\n return None\n\n # Otherwise, get the URL from the database list\n return self.get_database_list().get(self.database_name)\n\n def get_database(self):\n try:\n client = DataAPIClient(token=self.token)\n\n return client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting database: {e}\")\n\n return None\n\n def collection_exists(self):\n try:\n client = DataAPIClient(token=self.token)\n database = client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n return self.collection_name in list(database.list_collections())\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting collection status: {e}\")\n\n return False\n\n def _initialize_database_options(self):\n try:\n return [\n {\"name\": name, \"collections\": info[\"collections\"], \"records\": info[\"records\"]}\n for name, info in self.get_database_list().items()\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching databases: {e}\")\n\n return []\n\n def _initialize_collection_options(self):\n database = self.get_database()\n if database is None:\n return []\n\n try:\n collection_list = list(database.list_collections())\n\n return [\n {\n \"name\": col.name,\n \"records\": 0,\n \"provider\": col.options.vector.service.provider if col.options.vector else \"\",\n \"model\": col.options.vector.service.model_name if col.options.vector else \"\",\n }\n for col in collection_list\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching collections: {e}\")\n\n return []\n\n def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None): # noqa: ARG002\n # Refresh the collection name options\n build_config[\"database_name\"][\"options\"] = self._initialize_database_options()\n build_config[\"collection_name\"][\"options\"] = self._initialize_collection_options()\n\n return build_config\n\n @check_cached_vector_store\n def build_vector_store(self):\n try:\n from langchain_astradb import AstraDBVectorStore\n except ImportError as e:\n msg = (\n \"Could not import langchain Astra DB integration package. \"\n \"Please install it with `pip install langchain-astradb`.\"\n )\n raise ImportError(msg) from e\n\n # Get the embedding model and additional params\n embedding_params = {\"embedding\": self.embedding_model} if self.embedding_model else {}\n additional_params = self.astradb_vectorstore_kwargs or {}\n\n # Get the running environment for Langflow\n environment = (\n parse_api_endpoint(self.get_api_endpoint()).environment if self.get_api_endpoint() is not None else None\n )\n\n # Get Langflow version and platform information\n __version__ = get_version_info()[\"version\"]\n langflow_prefix = \"\"\n if os.getenv(\"LANGFLOW_HOST\") is not None:\n langflow_prefix = \"ds-\"\n\n # Bundle up the auto-detect parameters\n autodetect_params = {\n \"autodetect_collection\": self.collection_exists(), # TODO: May want to expose this option\n \"content_field\": self.content_field or None,\n \"ignore_invalid_documents\": self.ignore_invalid_documents,\n }\n\n # Attempt to build the Vector Store object\n try:\n vector_store = AstraDBVectorStore(\n # Astra DB Authentication Parameters\n token=self.token,\n api_endpoint=self.get_api_endpoint(),\n namespace=self.keyspace or None,\n collection_name=self.collection_name,\n environment=environment,\n # Astra DB Usage Tracking Parameters\n ext_callers=[(f\"{langflow_prefix}langflow\", __version__)],\n # Astra DB Vector Store Parameters\n **autodetect_params,\n **embedding_params,\n **additional_params,\n )\n except Exception as e:\n msg = f\"Error initializing AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self._add_documents_to_vector_store(vector_store)\n\n return vector_store\n\n def _add_documents_to_vector_store(self, vector_store) -> None:\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n documents.append(_input.to_lc_document())\n else:\n msg = \"Vector Store Inputs must be Data objects.\"\n raise TypeError(msg)\n\n if documents:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(documents)\n except Exception as e:\n msg = f\"Error adding documents to AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def _map_search_type(self) -> str:\n if self.search_type == \"Similarity with score threshold\":\n return \"similarity_score_threshold\"\n if self.search_type == \"MMR (Max Marginal Relevance)\":\n return \"mmr\"\n return \"similarity\"\n\n def _build_search_args(self):\n query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None\n\n if query:\n args = {\n \"query\": query,\n \"search_type\": self._map_search_type(),\n \"k\": self.number_of_results,\n \"score_threshold\": self.search_score_threshold,\n }\n elif self.advanced_search_filter:\n args = {\n \"n\": self.number_of_results,\n }\n else:\n return {}\n\n filter_arg = self.advanced_search_filter or {}\n if filter_arg:\n args[\"filter\"] = filter_arg\n\n return args\n\n def search_documents(self, vector_store=None) -> list[Data]:\n vector_store = vector_store or self.build_vector_store()\n\n self.log(f\"Search input: {self.search_query}\")\n self.log(f\"Search type: {self.search_type}\")\n self.log(f\"Number of results: {self.number_of_results}\")\n\n try:\n search_args = self._build_search_args()\n except Exception as e:\n msg = f\"Error in AstraDBVectorStore._build_search_args: {e}\"\n raise ValueError(msg) from e\n\n if not search_args:\n self.log(\"No search input or filters provided. Skipping search.\")\n return []\n\n docs = []\n search_method = \"search\" if \"query\" in search_args else \"metadata_search\"\n\n try:\n self.log(f\"Calling vector_store.{search_method} with args: {search_args}\")\n docs = getattr(vector_store, search_method)(**search_args)\n except Exception as e:\n msg = f\"Error performing {search_method} in AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self.log(f\"Retrieved documents: {len(docs)}\")\n\n data = docs_to_data(docs)\n self.log(f\"Converted documents to data: {len(data)}\")\n self.status = data\n return data\n\n def get_retriever_kwargs(self):\n search_args = self._build_search_args()\n return {\n \"search_type\": self._map_search_type(),\n \"search_kwargs\": search_args,\n }\n" }, "collection_name": { "_input_type": "DropdownInput", @@ -3438,6 +3460,7 @@ "method": "search_documents", "name": "search_results", "required_inputs": [ + "api_endpoint", "collection_name", "database_name", "token" @@ -3469,6 +3492,27 @@ "type": "NestedDict", "value": {} }, + "api_endpoint": { + "_input_type": "SecretStrInput", + "advanced": true, + "display_name": "API Endpoint", + "dynamic": false, + "info": "The Astra DB API Endpoint to use. Overrides selection of database.", + "input_types": [ + "Message" + ], + "load_from_db": true, + "name": "api_endpoint", + "password": true, + "placeholder": "", + "real_time_refresh": true, + "refresh_button": true, + "required": true, + "show": true, + "title_case": false, + "type": "str", + "value": "" + }, "astradb_vectorstore_kwargs": { "_input_type": "NestedDictInput", "advanced": true, @@ -3502,7 +3546,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import os\n\nfrom astrapy import DataAPIClient\nfrom astrapy.admin import parse_api_endpoint\nfrom langchain_astradb import AstraDBVectorStore\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.helpers import docs_to_data\nfrom langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n HandleInput,\n IntInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema import Data\nfrom langflow.utils.version import get_version_info\n\n\nclass AstraDBVectorStoreComponent(LCVectorStoreComponent):\n display_name: str = \"Astra DB\"\n description: str = \"Ingest and search documents in Astra DB\"\n documentation: str = \"https://docs.datastax.com/en/langflow/astra-components.html\"\n name = \"AstraDB\"\n icon: str = \"AstraDB\"\n\n _cached_vector_store: AstraDBVectorStore | None = None\n\n class NewDatabaseInput(DictInput):\n title: str = \"Create New Database\"\n description: str = \"Create a new database in Astra DB.\"\n db_names: list[str] = []\n status: str = \"\"\n collection_count: int = 0\n record_count: int = 0\n\n class NewCollectionInput(DictInput):\n title: str = \"Create New Collection\"\n description: str = \"Create a new collection in Astra DB.\"\n status: str = \"\"\n dimensions: int = 0\n model: str = \"\"\n similarity_metrics: list[str] = []\n icon: str = \"Collection\"\n\n base_inputs = LCVectorStoreComponent.inputs\n if \"search_query\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n tool_mode=True,\n )\n )\n if \"ingest_data\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n DataInput(\n name=\"ingest_data\",\n display_name=\"Ingest Data\",\n )\n )\n\n inputs = [\n SecretStrInput(\n name=\"token\",\n display_name=\"Astra DB Application Token\",\n info=\"Authentication token for accessing Astra DB.\",\n value=\"ASTRA_DB_APPLICATION_TOKEN\",\n required=True,\n advanced=os.getenv(\"ASTRA_ENHANCED\", \"false\").lower() == \"true\",\n real_time_refresh=True,\n ),\n DropdownInput(\n name=\"database_name\",\n display_name=\"Database\",\n info=\"Select a database in Astra DB.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n has_dialog=True, # New\n dialog_input=[NewDatabaseInput(name=\"database_input\").__dict__],\n options=[],\n value=\"\",\n ),\n DropdownInput(\n name=\"collection_name\",\n display_name=\"Collection\",\n info=\"The name of the collection within Astra DB where the vectors will be stored.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n has_dialog=True,\n dialog_input=[NewCollectionInput(name=\"collection_input\").__dict__],\n options=[],\n value=\"\",\n ),\n StrInput(\n name=\"keyspace\",\n display_name=\"Keyspace\",\n info=\"Optional keyspace within Astra DB to use for the collection.\",\n advanced=True,\n ),\n HandleInput(\n name=\"embedding_model\",\n display_name=\"Embedding Model\",\n input_types=[\"Embeddings\"],\n info=\"Allows an embedding model configuration.\",\n ),\n *base_inputs,\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Search Results\",\n info=\"Number of search results to return.\",\n advanced=True,\n value=4,\n ),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n info=\"Search type to use\",\n options=[\"Similarity\", \"Similarity with score threshold\", \"MMR (Max Marginal Relevance)\"],\n value=\"Similarity\",\n advanced=True,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results. \"\n \"(when using 'Similarity with score threshold')\",\n value=0,\n advanced=True,\n ),\n NestedDictInput(\n name=\"advanced_search_filter\",\n display_name=\"Search Metadata Filter\",\n info=\"Optional dictionary of filters to apply to the search query.\",\n advanced=True,\n ),\n StrInput(\n name=\"content_field\",\n display_name=\"Content Field\",\n info=\"Field to use as the text content field for the vector store.\",\n advanced=True,\n ),\n BoolInput(\n name=\"ignore_invalid_documents\",\n display_name=\"Ignore Invalid Documents\",\n info=\"Boolean flag to determine whether to ignore invalid documents at runtime.\",\n advanced=True,\n ),\n NestedDictInput(\n name=\"astradb_vectorstore_kwargs\",\n display_name=\"AstraDBVectorStore Parameters\",\n info=\"Optional dictionary of additional parameters for the AstraDBVectorStore.\",\n advanced=True,\n ),\n ]\n\n def get_database_list(self):\n # Get the admin object\n client = DataAPIClient(token=self.token)\n admin_client = client.get_admin(token=self.token)\n db_list = list(admin_client.list_databases())\n\n # Generate the api endpoint for each database\n return {\n db.info.name: {\n \"api_endpoint\": f\"https://{db.info.id}-{db.info.region}.apps.astra.datastax.com\",\n \"collections\": len(list(client.get_database(db.info.id, token=self.token).list_collection_names())),\n \"records\": 0,\n }\n for db in db_list\n }\n\n def get_api_endpoint(self):\n # If the database is not set, get the first database in the list\n if not self.database_name:\n return None\n\n # Otherwise, get the URL from the database list\n return self.get_database_list().get(self.database_name)\n\n def get_database(self):\n try:\n client = DataAPIClient(token=self.token)\n\n return client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting database: {e}\")\n\n return None\n\n def _initialize_database_options(self):\n try:\n return [\n {\"name\": name, \"collections\": info[\"collections\"], \"records\": info[\"records\"]}\n for name, info in self.get_database_list().items()\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching databases: {e}\")\n\n return []\n\n def _initialize_collection_options(self):\n database = self.get_database()\n if database is None:\n return []\n\n try:\n collection_list = list(database.list_collections())\n\n return [\n {\n \"name\": col.name,\n \"records\": 0,\n \"provider\": col.options.vector.service.provider if col.options.vector else \"\",\n \"model\": col.options.vector.service.model_name if col.options.vector else \"\",\n }\n for col in collection_list\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching collections: {e}\")\n\n return []\n\n def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None): # noqa: ARG002\n # Refresh the collection name options\n build_config[\"database_name\"][\"options\"] = self._initialize_database_options()\n build_config[\"collection_name\"][\"options\"] = self._initialize_collection_options()\n\n return build_config\n\n @check_cached_vector_store\n def build_vector_store(self):\n try:\n from langchain_astradb import AstraDBVectorStore\n except ImportError as e:\n msg = (\n \"Could not import langchain Astra DB integration package. \"\n \"Please install it with `pip install langchain-astradb`.\"\n )\n raise ImportError(msg) from e\n\n # Get the embedding model\n embedding_params = {\"embedding\": self.embedding_model} if self.embedding_choice == \"Embedding Model\" else {}\n\n # Get the running environment for Langflow\n environment = (\n parse_api_endpoint(self.get_api_endpoint()).environment if self.get_api_endpoint() is not None else None\n )\n\n # Get Langflow version and platform information\n __version__ = get_version_info()[\"version\"]\n langflow_prefix = \"\"\n if os.getenv(\"LANGFLOW_HOST\") is not None:\n langflow_prefix = \"ds-\"\n\n # Bundle up the auto-detect parameters\n autodetect_params = {\n \"autodetect_collection\": True, # TODO: May want to expose this option\n \"content_field\": self.content_field or None,\n \"ignore_invalid_documents\": self.ignore_invalid_documents,\n }\n\n # Attempt to build the Vector Store object\n try:\n vector_store = AstraDBVectorStore(\n # Astra DB Authentication Parameters\n token=self.token,\n api_endpoint=self.get_api_endpoint(),\n namespace=self.keyspace or None,\n collection_name=self.collection_name,\n environment=environment,\n # Astra DB Usage Tracking Parameters\n ext_callers=[(f\"{langflow_prefix}langflow\", __version__)],\n # Astra DB Vector Store Parameters\n **autodetect_params,\n **embedding_params,\n **self.astradb_vectorstore_kwargs,\n )\n except Exception as e:\n msg = f\"Error initializing AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self._add_documents_to_vector_store(vector_store)\n\n return vector_store\n\n def _add_documents_to_vector_store(self, vector_store) -> None:\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n documents.append(_input.to_lc_document())\n else:\n msg = \"Vector Store Inputs must be Data objects.\"\n raise TypeError(msg)\n\n if documents:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(documents)\n except Exception as e:\n msg = f\"Error adding documents to AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def _map_search_type(self) -> str:\n if self.search_type == \"Similarity with score threshold\":\n return \"similarity_score_threshold\"\n if self.search_type == \"MMR (Max Marginal Relevance)\":\n return \"mmr\"\n return \"similarity\"\n\n def _build_search_args(self):\n query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None\n\n if query:\n args = {\n \"query\": query,\n \"search_type\": self._map_search_type(),\n \"k\": self.number_of_results,\n \"score_threshold\": self.search_score_threshold,\n }\n elif self.advanced_search_filter:\n args = {\n \"n\": self.number_of_results,\n }\n else:\n return {}\n\n filter_arg = self.advanced_search_filter or {}\n if filter_arg:\n args[\"filter\"] = filter_arg\n\n return args\n\n def search_documents(self, vector_store=None) -> list[Data]:\n vector_store = vector_store or self.build_vector_store()\n\n self.log(f\"Search input: {self.search_query}\")\n self.log(f\"Search type: {self.search_type}\")\n self.log(f\"Number of results: {self.number_of_results}\")\n\n try:\n search_args = self._build_search_args()\n except Exception as e:\n msg = f\"Error in AstraDBVectorStore._build_search_args: {e}\"\n raise ValueError(msg) from e\n\n if not search_args:\n self.log(\"No search input or filters provided. Skipping search.\")\n return []\n\n docs = []\n search_method = \"search\" if \"query\" in search_args else \"metadata_search\"\n\n try:\n self.log(f\"Calling vector_store.{search_method} with args: {search_args}\")\n docs = getattr(vector_store, search_method)(**search_args)\n except Exception as e:\n msg = f\"Error performing {search_method} in AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self.log(f\"Retrieved documents: {len(docs)}\")\n\n data = docs_to_data(docs)\n self.log(f\"Converted documents to data: {len(data)}\")\n self.status = data\n return data\n\n def get_retriever_kwargs(self):\n search_args = self._build_search_args()\n return {\n \"search_type\": self._map_search_type(),\n \"search_kwargs\": search_args,\n }\n" + "value": "import os\n\nfrom astrapy import DataAPIClient\nfrom astrapy.admin import parse_api_endpoint\nfrom langchain_astradb import AstraDBVectorStore\n\nfrom langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store\nfrom langflow.helpers import docs_to_data\nfrom langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput\nfrom langflow.io import (\n BoolInput,\n DataInput,\n DropdownInput,\n HandleInput,\n IntInput,\n SecretStrInput,\n StrInput,\n)\nfrom langflow.schema import Data\nfrom langflow.utils.version import get_version_info\n\n\nclass AstraDBVectorStoreComponent(LCVectorStoreComponent):\n display_name: str = \"Astra DB\"\n description: str = \"Ingest and search documents in Astra DB\"\n documentation: str = \"https://docs.datastax.com/en/langflow/astra-components.html\"\n name = \"AstraDB\"\n icon: str = \"AstraDB\"\n\n _cached_vector_store: AstraDBVectorStore | None = None\n\n class NewDatabaseInput(DictInput):\n title: str = \"Create New Database\"\n description: str = \"Create a new database in Astra DB.\"\n db_names: list[str] = []\n status: str = \"\"\n collection_count: int = 0\n record_count: int = 0\n\n class NewCollectionInput(DictInput):\n title: str = \"Create New Collection\"\n description: str = \"Create a new collection in Astra DB.\"\n status: str = \"\"\n dimensions: int = 0\n model: str = \"\"\n similarity_metrics: list[str] = []\n icon: str = \"Collection\"\n\n base_inputs = LCVectorStoreComponent.inputs\n if \"search_query\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n MessageTextInput(\n name=\"search_query\",\n display_name=\"Search Query\",\n tool_mode=True,\n )\n )\n if \"ingest_data\" not in [input_.name for input_ in base_inputs]:\n base_inputs.append(\n DataInput(\n name=\"ingest_data\",\n display_name=\"Ingest Data\",\n )\n )\n\n inputs = [\n SecretStrInput(\n name=\"token\",\n display_name=\"Astra DB Application Token\",\n info=\"Authentication token for accessing Astra DB.\",\n value=\"ASTRA_DB_APPLICATION_TOKEN\",\n required=True,\n advanced=os.getenv(\"ASTRA_ENHANCED\", \"false\").lower() == \"true\",\n real_time_refresh=True,\n ),\n SecretStrInput(\n name=\"api_endpoint\",\n display_name=\"API Endpoint\",\n info=\"The Astra DB API Endpoint to use. Overrides selection of database.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n advanced=True,\n ),\n DropdownInput(\n name=\"database_name\",\n display_name=\"Database\",\n info=\"Select a database in Astra DB.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n dialog_inputs=[NewDatabaseInput(name=\"database_input\").__dict__],\n options=[],\n value=\"\",\n ),\n DropdownInput(\n name=\"collection_name\",\n display_name=\"Collection\",\n info=\"The name of the collection within Astra DB where the vectors will be stored.\",\n required=True,\n refresh_button=True,\n real_time_refresh=True,\n dialog_inputs=[NewCollectionInput(name=\"collection_input\").__dict__],\n options=[],\n value=\"\",\n ),\n StrInput(\n name=\"keyspace\",\n display_name=\"Keyspace\",\n info=\"Optional keyspace within Astra DB to use for the collection.\",\n advanced=True,\n ),\n HandleInput(\n name=\"embedding_model\",\n display_name=\"Embedding Model\",\n input_types=[\"Embeddings\"],\n info=\"Allows an embedding model configuration.\",\n ),\n *base_inputs,\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Search Results\",\n info=\"Number of search results to return.\",\n advanced=True,\n value=4,\n ),\n DropdownInput(\n name=\"search_type\",\n display_name=\"Search Type\",\n info=\"Search type to use\",\n options=[\"Similarity\", \"Similarity with score threshold\", \"MMR (Max Marginal Relevance)\"],\n value=\"Similarity\",\n advanced=True,\n ),\n FloatInput(\n name=\"search_score_threshold\",\n display_name=\"Search Score Threshold\",\n info=\"Minimum similarity score threshold for search results. \"\n \"(when using 'Similarity with score threshold')\",\n value=0,\n advanced=True,\n ),\n NestedDictInput(\n name=\"advanced_search_filter\",\n display_name=\"Search Metadata Filter\",\n info=\"Optional dictionary of filters to apply to the search query.\",\n advanced=True,\n ),\n StrInput(\n name=\"content_field\",\n display_name=\"Content Field\",\n info=\"Field to use as the text content field for the vector store.\",\n advanced=True,\n ),\n BoolInput(\n name=\"ignore_invalid_documents\",\n display_name=\"Ignore Invalid Documents\",\n info=\"Boolean flag to determine whether to ignore invalid documents at runtime.\",\n advanced=True,\n ),\n NestedDictInput(\n name=\"astradb_vectorstore_kwargs\",\n display_name=\"AstraDBVectorStore Parameters\",\n info=\"Optional dictionary of additional parameters for the AstraDBVectorStore.\",\n advanced=True,\n ),\n ]\n\n def get_database_list(self):\n # Get the admin object\n client = DataAPIClient(token=self.token)\n admin_client = client.get_admin(token=self.token)\n db_list = list(admin_client.list_databases())\n\n # Generate the api endpoint for each database\n return {\n db.info.name: {\n \"api_endpoint\": f\"https://{db.info.id}-{db.info.region}.apps.astra.datastax.com\",\n \"collections\": len(list(client.get_database(db.info.id, token=self.token).list_collection_names())),\n \"records\": 0,\n }\n for db in db_list\n }\n\n def get_api_endpoint(self):\n # If the API endpoint is set, return it\n if self.api_endpoint:\n return self.api_endpoint\n\n # If the database is not set, nothing we can do.\n if not self.database_name:\n return None\n\n # Otherwise, get the URL from the database list\n return self.get_database_list().get(self.database_name)\n\n def get_database(self):\n try:\n client = DataAPIClient(token=self.token)\n\n return client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting database: {e}\")\n\n return None\n\n def collection_exists(self):\n try:\n client = DataAPIClient(token=self.token)\n database = client.get_database(\n self.get_api_endpoint(),\n token=self.token,\n )\n return self.collection_name in list(database.list_collections())\n except Exception as e: # noqa: BLE001\n self.log(f\"Error getting collection status: {e}\")\n\n return False\n\n def _initialize_database_options(self):\n try:\n return [\n {\"name\": name, \"collections\": info[\"collections\"], \"records\": info[\"records\"]}\n for name, info in self.get_database_list().items()\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching databases: {e}\")\n\n return []\n\n def _initialize_collection_options(self):\n database = self.get_database()\n if database is None:\n return []\n\n try:\n collection_list = list(database.list_collections())\n\n return [\n {\n \"name\": col.name,\n \"records\": 0,\n \"provider\": col.options.vector.service.provider if col.options.vector else \"\",\n \"model\": col.options.vector.service.model_name if col.options.vector else \"\",\n }\n for col in collection_list\n ]\n except Exception as e: # noqa: BLE001\n self.log(f\"Error fetching collections: {e}\")\n\n return []\n\n def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None): # noqa: ARG002\n # Refresh the collection name options\n build_config[\"database_name\"][\"options\"] = self._initialize_database_options()\n build_config[\"collection_name\"][\"options\"] = self._initialize_collection_options()\n\n return build_config\n\n @check_cached_vector_store\n def build_vector_store(self):\n try:\n from langchain_astradb import AstraDBVectorStore\n except ImportError as e:\n msg = (\n \"Could not import langchain Astra DB integration package. \"\n \"Please install it with `pip install langchain-astradb`.\"\n )\n raise ImportError(msg) from e\n\n # Get the embedding model and additional params\n embedding_params = {\"embedding\": self.embedding_model} if self.embedding_model else {}\n additional_params = self.astradb_vectorstore_kwargs or {}\n\n # Get the running environment for Langflow\n environment = (\n parse_api_endpoint(self.get_api_endpoint()).environment if self.get_api_endpoint() is not None else None\n )\n\n # Get Langflow version and platform information\n __version__ = get_version_info()[\"version\"]\n langflow_prefix = \"\"\n if os.getenv(\"LANGFLOW_HOST\") is not None:\n langflow_prefix = \"ds-\"\n\n # Bundle up the auto-detect parameters\n autodetect_params = {\n \"autodetect_collection\": self.collection_exists(), # TODO: May want to expose this option\n \"content_field\": self.content_field or None,\n \"ignore_invalid_documents\": self.ignore_invalid_documents,\n }\n\n # Attempt to build the Vector Store object\n try:\n vector_store = AstraDBVectorStore(\n # Astra DB Authentication Parameters\n token=self.token,\n api_endpoint=self.get_api_endpoint(),\n namespace=self.keyspace or None,\n collection_name=self.collection_name,\n environment=environment,\n # Astra DB Usage Tracking Parameters\n ext_callers=[(f\"{langflow_prefix}langflow\", __version__)],\n # Astra DB Vector Store Parameters\n **autodetect_params,\n **embedding_params,\n **additional_params,\n )\n except Exception as e:\n msg = f\"Error initializing AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self._add_documents_to_vector_store(vector_store)\n\n return vector_store\n\n def _add_documents_to_vector_store(self, vector_store) -> None:\n documents = []\n for _input in self.ingest_data or []:\n if isinstance(_input, Data):\n documents.append(_input.to_lc_document())\n else:\n msg = \"Vector Store Inputs must be Data objects.\"\n raise TypeError(msg)\n\n if documents:\n self.log(f\"Adding {len(documents)} documents to the Vector Store.\")\n try:\n vector_store.add_documents(documents)\n except Exception as e:\n msg = f\"Error adding documents to AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n else:\n self.log(\"No documents to add to the Vector Store.\")\n\n def _map_search_type(self) -> str:\n if self.search_type == \"Similarity with score threshold\":\n return \"similarity_score_threshold\"\n if self.search_type == \"MMR (Max Marginal Relevance)\":\n return \"mmr\"\n return \"similarity\"\n\n def _build_search_args(self):\n query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None\n\n if query:\n args = {\n \"query\": query,\n \"search_type\": self._map_search_type(),\n \"k\": self.number_of_results,\n \"score_threshold\": self.search_score_threshold,\n }\n elif self.advanced_search_filter:\n args = {\n \"n\": self.number_of_results,\n }\n else:\n return {}\n\n filter_arg = self.advanced_search_filter or {}\n if filter_arg:\n args[\"filter\"] = filter_arg\n\n return args\n\n def search_documents(self, vector_store=None) -> list[Data]:\n vector_store = vector_store or self.build_vector_store()\n\n self.log(f\"Search input: {self.search_query}\")\n self.log(f\"Search type: {self.search_type}\")\n self.log(f\"Number of results: {self.number_of_results}\")\n\n try:\n search_args = self._build_search_args()\n except Exception as e:\n msg = f\"Error in AstraDBVectorStore._build_search_args: {e}\"\n raise ValueError(msg) from e\n\n if not search_args:\n self.log(\"No search input or filters provided. Skipping search.\")\n return []\n\n docs = []\n search_method = \"search\" if \"query\" in search_args else \"metadata_search\"\n\n try:\n self.log(f\"Calling vector_store.{search_method} with args: {search_args}\")\n docs = getattr(vector_store, search_method)(**search_args)\n except Exception as e:\n msg = f\"Error performing {search_method} in AstraDBVectorStore: {e}\"\n raise ValueError(msg) from e\n\n self.log(f\"Retrieved documents: {len(docs)}\")\n\n data = docs_to_data(docs)\n self.log(f\"Converted documents to data: {len(data)}\")\n self.status = data\n return data\n\n def get_retriever_kwargs(self):\n search_args = self._build_search_args()\n return {\n \"search_type\": self._map_search_type(),\n \"search_kwargs\": search_args,\n }\n" }, "collection_name": { "_input_type": "DropdownInput",