diff --git a/examples/documents_async.py b/examples/documents_async.py new file mode 100644 index 0000000..27b3eeb --- /dev/null +++ b/examples/documents_async.py @@ -0,0 +1,198 @@ +""" Using Zep as a vector database. A simple sync example. """ +import asyncio +import time +from typing import List +from uuid import uuid4 + +from faker import Faker +from zep_python import ZepClient +from zep_python.document import Document + +fake = Faker() +fake.random.seed(42) + + +def naive_split_text(text: str, max_chunk_size: int): + """Naive text splitter chunks document into chunks of max_chunk_size, + using paragraphs and sentences as guides.""" + chunks = [] + + # remove extraneous whitespace + text = " ".join(text.split()) + # split into paragraphs + paragraphs = text.split("\n\n") + + # clean up paragraphs + paragraphs = [p.strip() for p in paragraphs if len(p.strip()) > 0] + + for paragraph in paragraphs: + if 0 > len(paragraph) <= max_chunk_size: + chunks.append(paragraph) + else: + sentences = paragraph.split(". ") + current_chunk = "" + + for sentence in sentences: + if len(current_chunk) + len(sentence) > max_chunk_size: + chunks.append(current_chunk) + current_chunk = sentence + else: + current_chunk += ". " + sentence + + chunks.append(current_chunk) + + return chunks + + +def read_chunk_from_file(file: str, chunk_size: int): + with open(file, "r") as f: + text = f.read() + + chunks = naive_split_text(text, chunk_size) + + print( + f"Splitting text into {len(chunks)} chunks of max size {chunk_size} characters." + ) + + return chunks + + +def print_results(results: List[Document]): + for result in results: + print(result.content, result.metadata, " -> ", result.score, "\n") + + +async def main(file: str): + zep_api_url = "http://localhost:8000" + max_chunk_size = 500 + collection_name = f"babbage{uuid4()}".replace("-", "") + + print(f"Creating collection {collection_name}") + + client = ZepClient(base_url=zep_api_url) + collection = client.document.add_collection( + name=collection_name, # required + description="Charles Babbage's Babbage's Calculating Engine", # optional + metadata=fake.pydict(allowed_types=[str]), # optional metadata + embedding_dimensions=1536, # this must match the model you've configured in Zep + is_auto_embedded=True, # use Zep's built-in embedder. Defaults to True + ) + + chunks = read_chunk_from_file(file, max_chunk_size) + + documents = [ + Document( + content=chunk, + document_id=fake.unique.file_name(extension="txt"), + metadata=fake.pydict(allowed_types=[str]), # optional metadata + ) + for chunk in chunks + ] + + print(f"Adding {len(documents)} documents to collection {collection_name}") + + uuids = await collection.aadd_documents(documents) + + print(f"Added {len(uuids)} documents to collection {collection_name}") + + # monitor embedding progress + while True: + c = await client.document.aget_collection(collection_name) + print( + "Embedding status: " + f"{c.document_embedded_count}/{c.document_count} documents embedded" + ) + time.sleep(1) + if c.status == "ready": + break + + # List all collections + collections = await client.document.alist_collections() + print(f"Found {len(collections)} collections") + print("\n".join([c.name for c in collections])) + + # Update collection description and metadata + await client.document.aupdate_collection( + collection_name, + description="Charles Babbage's Babbage's Calculating Engine 2", + metadata=fake.pydict(allowed_types=[str]), + ) + + # Get updated collection + collection = await client.document.aget_collection(collection_name) + print(f"Updated collection description: {collection.description}") + + # search for documents + # Using "the moon" here as we should find documents related to "astronomy" + query = "the moon" + search_results = await collection.asearch(text=query, limit=5) + print(f"Found {len(search_results)} documents matching query '{query}'") + print_results(search_results) + + # retrieve a single document by uuid + document_to_retrieve = uuids[25] + print(f"Retrieving document {document_to_retrieve}") + retrieved_document = await collection.aget_document(document_to_retrieve) + print(retrieved_document.dict()) + + # Update a document's metadata + print(f"Updating document {document_to_retrieve} metadata") + await collection.aupdate_document( + document_to_retrieve, + description="Charles Babbage's Babbage's Calculating Engine 2", + metadata={"foo": "bar", "baz": "qux"}, + ) + + # search for documents using both text and metadata + metadata_query = { + "where": {"jsonpath": '$[*] ? (@.baz == "qux")'}, + } + new_search_results = await collection.asearch( + text=query, metadata=metadata_query, limit=5 + ) + print( + f"Found {len(new_search_results)} documents matching query '{query}'" + f" {metadata_query}" + ) + print_results(new_search_results) + + # Search by embedding + interesting_document = search_results[0] + print(f"Searching for documents similar to:\n{interesting_document.content}\n") + embedding_search_results = await collection.asearch( + embedding=interesting_document.embedding, limit=5 + ) + print(f"Found {len(embedding_search_results)} documents matching embedding") + print("Most similar documents:") + print_results(embedding_search_results) + + # delete a document + print(f"Deleting document {document_to_retrieve}") + await collection.adelete_document(document_to_retrieve) + + # Get a list of documents in the collection by uuid + docs_to_get = uuids[40:50] + print(f"Getting documents: {docs_to_get}") + documents = await collection.aget_documents(docs_to_get) + print(f"Got {len(documents)} documents") + print_results(documents) + + # Index the collection + # We wouldn't ordinarily do this until the collection is larger. + # See the documentation for more details. + print(f"Indexing collection {collection_name}") + collection.create_index(force=True) # Do not use force unless testing! + + # search for documents now that the collection is indexed + search_results = collection.search(text=query, limit=5) + print(f"Found {len(search_results)} documents matching query '{query}'") + print_results(search_results) + + # Delete the collection + print(f"Deleting collection {collection_name}") + await client.document.adelete_collection(collection_name) + + +if __name__ == "__main__": + file = "babbages_calculating_engine.txt" + asyncio.run(main(file)) diff --git a/pyproject.toml b/pyproject.toml index 7c92187..2469bc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "zep-python" -version = "1.2.0" +version = "1.2.1" description = "Zep: Fast, scalable building blocks for LLM apps. This is the Python client for the Zep service." authors = ["Daniel Chalef "] readme = "README.md" diff --git a/tests/documents_test.py b/tests/documents_test.py index fb1a625..dd9125e 100644 --- a/tests/documents_test.py +++ b/tests/documents_test.py @@ -12,6 +12,7 @@ from zep_python.document.collections import ( LARGE_BATCH_WARNING_LIMIT, DocumentCollection, + generate_batches, ) from zep_python.exceptions import APIError, NotFoundError from zep_python.zep_client import ZepClient @@ -684,6 +685,15 @@ async def test_asearch_documents_api_error( await mock_collection.asearch("search_text", {"key": "value"}, 10) +def test_generate_batches(): + documents = [gen_mock_document("test_collection", i) for i in range(10)] + batches = generate_batches(documents, 5) + + assert len(list(batches)) == 2 + for batch in batches: + assert len(batch) == 5 + + def generate_mock_collection( col_id: Union[int, str], with_clients: bool = False ) -> DocumentCollection: diff --git a/zep_python/document/collections.py b/zep_python/document/collections.py index f0dc714..f3afe21 100644 --- a/zep_python/document/collections.py +++ b/zep_python/document/collections.py @@ -17,7 +17,7 @@ from .models import Document, DocumentCollectionModel MIN_DOCS_TO_INDEX = 10_000 -DEFAULT_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 500 LARGE_BATCH_WARNING_LIMIT = 1000 LARGE_BATCH_WARNING = ( f"Batch size is greater than {LARGE_BATCH_WARNING_LIMIT}. " @@ -80,13 +80,19 @@ def status(self) -> str: else: return "pending" - async def aadd_documents(self, documents: List[Document]) -> List[str]: + async def aadd_documents( + self, + documents: List[Document], + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> List[str]: """ Asynchronously create documents. documents : List[Document] A list of Document objects representing the documents to create. + batch_size : int, optional + The number of documents to upload in each batch. Defaults to 500. Returns ------- @@ -108,7 +114,7 @@ async def aadd_documents(self, documents: List[Document]) -> List[str]: raise ValueError("document list must be provided") uuids: List[str] = [] - for batch in generate_batches(documents, DEFAULT_BATCH_SIZE): + for batch in generate_batches(documents, batch_size): response = await self._aclient.post( f"/collection/{self.name}/document", json=batch, @@ -120,7 +126,11 @@ async def aadd_documents(self, documents: List[Document]) -> List[str]: return uuids - def add_documents(self, documents: List[Document]) -> List[str]: + def add_documents( + self, + documents: List[Document], + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> List[str]: """ Create documents. @@ -132,6 +142,8 @@ def add_documents(self, documents: List[Document]) -> List[str]: ------- List[str] The UUIDs of the created documents. + batch_size : int, optional + The number of documents to upload in each batch. Defaults to 500. Raises ------ @@ -147,7 +159,7 @@ def add_documents(self, documents: List[Document]) -> List[str]: raise ValueError("document list must be provided") uuids: List[str] = [] - for batch in generate_batches(documents, DEFAULT_BATCH_SIZE): + for batch in generate_batches(documents, batch_size): response = self._client.post( f"/collection/{self.name}/document", json=batch,