From b4072a7711dc634f6305d76865ecff4f4b4e26a5 Mon Sep 17 00:00:00 2001 From: doku88 Date: Tue, 5 Nov 2024 14:32:43 -0800 Subject: [PATCH] optimizing upload test speed for cicd --- testing/upload_sync_test.py | 172 ++++++++++++------------------------ 1 file changed, 58 insertions(+), 114 deletions(-) diff --git a/testing/upload_sync_test.py b/testing/upload_sync_test.py index 209e93b..0deadf3 100644 --- a/testing/upload_sync_test.py +++ b/testing/upload_sync_test.py @@ -5,12 +5,11 @@ from synth_sdk.tracing.abstractions import TrainingQuestion, RewardSignal, Dataset from synth_sdk.tracing.events.store import event_store from typing import Dict -import asyncio -import synth_sdk.config.settings +#import asyncio +#import synth_sdk.config.settings import time -import json +#import json import logging -from pprint import pprint import pytest # Configure logging @@ -20,6 +19,10 @@ ) logger = logging.getLogger(__name__) +questions = ["What's the capital of France?", + "What's 2+2?", + "Who wrote Romeo and Juliet?",] + class TestAgent: def __init__(self): self.system_id = "test_agent_upload" @@ -36,7 +39,7 @@ def __init__(self): event_type="lm_call", manage_event="create", increment_partition=True, - verbose=True, + verbose=False, ) def make_lm_call(self, user_message: str) -> str: # Calls an LLM to respond to a user message # Only pass the user message, not self @@ -57,7 +60,7 @@ def make_lm_call(self, user_message: str) -> str: # Calls an LLM to respond to a origin="environment", event_type="environment_processing", manage_event="create", - verbose=True, + verbose=False, ) def process_environment(self, input_data: str) -> dict: # Only pass the input data, not self @@ -81,114 +84,55 @@ def generate_payload_from_data(self, dataset: Dataset) -> Dict: return payload @pytest.mark.asyncio -async def test_upload(show_payload: bool = False): +async def test_upload(): logger.info("Starting run_test") - # Create test agent - agent = TestAgent() - - try: - # List of test questions - questions = [ - "What's the capital of France?", - "What's 2+2?", - "Who wrote Romeo and Juliet?", - ] - logger.debug("Test questions initialized: %s", questions) - - # Make multiple LM calls with environment processing - responses = [] - for i, question in enumerate(questions): - logger.info("Processing question %d: %s", i, question) - try: - env_result = agent.process_environment(question) - logger.debug("Environment processing result: %s", env_result) - - response = agent.make_lm_call(question) - responses.append(response) - logger.debug("Response received and stored: %s", response) - except Exception as e: - logger.error("Error during processing: %s", str(e), exc_info=True) - continue - - logger.info("Creating dataset for upload") - # Create dataset for upload - dataset = Dataset( - questions=[ - TrainingQuestion( - intent="Test question", - criteria="Testing tracing functionality", - question_id=f"q{i}", - ) - for i in range(len(questions)) - ], - reward_signals=[ - RewardSignal( - question_id=f"q{i}", - system_id=agent.system_id, - reward=1.0, - annotation="Test reward", - ) - for i in range(len(questions)) - ], - ) - logger.debug( - "Dataset created with %d questions and %d reward signals", - len(dataset.questions), - len(dataset.reward_signals), - ) + agent = TestAgent() # Create test agent + + logger.debug("Test questions initialized: %s", questions) # List of test questions + + # Make multiple LM calls with environment processing + responses = [] + for i, question in enumerate(questions): + logger.info("Processing question %d: %s", i, question) + env_result = agent.process_environment(question) + logger.debug("Environment processing result: %s", env_result) + + response = agent.make_lm_call(question) + responses.append(response) + logger.debug("Response received and stored: %s", response) + + logger.info("Creating dataset for upload") + # Create dataset for upload + dataset = Dataset( + questions=[ + TrainingQuestion( + intent="Test question", + criteria="Testing tracing functionality", + question_id=f"q{i}", + ) + for i in range(len(questions)) + ], + reward_signals=[ + RewardSignal( + question_id=f"q{i}", + system_id=agent.system_id, + reward=1.0, + annotation="Test reward", + ) + for i in range(len(questions)) + ], + ) + logger.debug( + "Dataset created with %d questions and %d reward signals", + len(dataset.questions), + len(dataset.reward_signals), + ) - # Upload traces - try: - logger.info("Attempting to upload traces") - response, payload = await upload(dataset=dataset, verbose=True) - logger.info("Upload successful!") - print("Upload successful!") - if show_payload: - logger.info("Payload sent to server:") - pprint(payload) - - except Exception as e: - logger.error("Upload failed: %s", str(e), exc_info=True) - print(f"Upload failed: {str(e)}") - - # Print debug information - traces = event_store.get_system_traces() - logger.debug("Retrieved %d system traces", len(traces)) - print("\nTraces:") - print(json.dumps([trace.to_dict() for trace in traces], indent=2)) - - print("\nDataset:") - print(json.dumps(dataset.to_dict(), indent=2)) - finally: - logger.info("Starting cleanup") - # Cleanup - if hasattr(_local, "active_events"): - for event_type, event in _local.active_events.items(): - logger.debug("Cleaning up event: %s", event_type) - if event.closed is None: - event.closed = time.time() - if hasattr(_local, "system_id"): - try: - event_store.add_event(_local.system_id, event) - logger.debug( - "Successfully cleaned up event: %s", event_type - ) - except Exception as e: - logger.error( - "Error during cleanup of event %s: %s", - event_type, - str(e), - exc_info=True, - ) - print( - f"Error during cleanup of event {event_type}: {str(e)}" - ) - logger.info("Cleanup completed") - assert payload == agent.generate_payload_from_data(dataset) + # Upload traces + logger.info("Attempting to upload traces") + response, payload = await upload(dataset=dataset, verbose=True) + logger.info("Upload successful!") + logger.info("Payload sent to server:") -# Run a sample agent using the async decorator and tracker -if __name__ == "__main__": - logger.info("Starting main execution") - asyncio.run(test_upload()) - logger.info("Main execution completed") - logger.info("Check Supabase table traces for uploaded data use UPLOAD ID key to filter") + # Pytest assertion + assert payload == agent.generate_payload_from_data(dataset)