Skip to content

Commit

Permalink
upload traces in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
DoKu88 committed Dec 15, 2024
1 parent f77e491 commit ad47f3e
Showing 1 changed file with 72 additions and 7 deletions.
79 changes: 72 additions & 7 deletions synth_sdk/tracing/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import json
from pprint import pprint
import asyncio
import sys
from pympler import asizeof


def validate_json(data: dict) -> None:
Expand All @@ -35,15 +37,13 @@ def createPayload(dataset: Dataset, traces: List[SystemTrace]) -> Dict[str, Any]
}
return payload

def send_system_traces(
dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str,
async def send_system_traces(
dataset: Dataset, traces: List[SystemTrace], base_url: str, api_key: str, upload_id: str
):
# Send all system traces and dataset metadata to the server.
# Get the token using the API key
token_url = f"{base_url}/v1/auth/token"
token_response = requests.get(
token_url, headers={"customer_specific_api_key": api_key}
)
token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key})
token_response.raise_for_status()
access_token = token_response.json()["access_token"]

Expand All @@ -54,9 +54,13 @@ def send_system_traces(

validate_json(payload) # Validate the entire payload

memory_size = asizeof.asizeof(payload) / 1024 # Memory size in KB
logging.info(f"Payload size (in memory): {memory_size:.2f} KB")

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
"upload_id": upload_id,
"Authorization": f"Bearer {access_token}"
}

try:
Expand All @@ -74,6 +78,67 @@ def send_system_traces(
logging.error(f"An error occurred: {err}")
raise

def chunk_traces(traces: List[SystemTrace], chunk_size_kb: int = 1024):
"""Split traces into chunks that won't exceed approximately chunk_size_kb when serialized"""
chunks = []
current_chunk = []
current_size = 0

for trace in traces:
trace_dict = trace.to_dict()
trace_size = asizeof.asizeof(trace_dict) / 1024 # Memory size in KB
logging.info(f"Trace size (in memory): {trace_size:.2f} KB")

if current_size + trace_size > chunk_size_kb:
# Current chunk would exceed size limit, start new chunk
chunks.append(current_chunk)
current_chunk = [trace]
current_size = trace_size
else:
current_chunk.append(trace)
current_size += trace_size

if current_chunk:
chunks.append(current_chunk)

return chunks

async def get_upload_id(base_url: str, api_key: str):
token_url = f"{base_url}/v1/auth/token"
token_response = requests.get(token_url, headers={"customer_specific_api_key": api_key})
token_response.raise_for_status()
access_token = token_response.json()["access_token"]

api_url = f"{base_url}/v1/uploads/get-upload-id"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}

try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
upload_id = response.json()["upload_id"]
return upload_id
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise
except Exception as e:
logging.error(f"An error occurred: {e}")
raise

async def send_system_traces_chunked(dataset: Dataset, traces: List[SystemTrace],
base_url: str, api_key: str, chunk_size_kb: int = 1024):
"""Upload traces in chunks to avoid memory issues"""
trace_chunks = chunk_traces(traces, chunk_size_kb)
upload_id = await get_upload_id(base_url, api_key)
tasks = []
for chunk in trace_chunks:
task = asyncio.create_task(send_system_traces(dataset, chunk, base_url, api_key, upload_id))
tasks.append(task)

results = await asyncio.gather(*tasks)
return results

class UploadValidator(BaseModel):
traces: List[Dict[str, Any]]
Expand Down Expand Up @@ -272,7 +337,7 @@ def upload_helper(dataset: Dataset, traces: List[SystemTrace]=[], verbose: bool
print("Upload format validation successful")

# Send to server
response, payload = send_system_traces(
response, payload = send_system_traces_chunked(
dataset=dataset,
traces=traces,
base_url="https://agent-learning.onrender.com",
Expand Down

0 comments on commit ad47f3e

Please sign in to comment.