Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE, wip audio evals #5616

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 223 additions & 1 deletion packages/phoenix-evals/src/phoenix/evals/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from itertools import product
from typing import (
Any,
Callable,
DefaultDict,
Dict,
Iterable,
Expand Down Expand Up @@ -33,6 +34,7 @@
normalize_classification_template,
)
from phoenix.evals.utils import (
Audio,
NOT_PARSABLE,
get_tqdm_progress_bar_formatter,
openai_function_call_kwargs,
Expand Down Expand Up @@ -203,7 +205,7 @@ def _process_response(response: str) -> Tuple[str, Optional[str]]:
printif(
verbose and unrailed_label == NOT_PARSABLE,
f"- Could not parse {repr(response)}",
)
)
else:
unrailed_label = response
explanation = None
Expand Down Expand Up @@ -268,6 +270,226 @@ def _run_llm_classification_sync(input_data: pd.Series[Any]) -> ParsedLLMRespons
)


def audio_classify(
dataframe: Union[List, pd.DataFrame],
model: BaseModel,
template: Union[ClassificationTemplate, PromptTemplate, str],
rails: List[str],
data_fetcher: Optional[Callable[[str], Audio]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think we should be requiring the user return our internal type, it feels like a structure they need to learn / import from that feels clunky

system_instruction: Optional[str] = None,
verbose: bool = False,
use_function_calling_if_available: bool = True,
provide_explanation: bool = False,
include_prompt: bool = False,
include_response: bool = False,
include_exceptions: bool = False,
max_retries: int = 10,
exit_on_error: bool = True,
run_sync: bool = False,
concurrency: Optional[int] = None,
progress_bar_format: Optional[str] = get_tqdm_progress_bar_formatter("llm_classify"),
) -> pd.DataFrame:
"""
Classifies each input row of the dataframe using an LLM.
Returns a pandas.DataFrame where the first column is named `label` and contains
the classification labels. An optional column named `explanation` is added when
`provide_explanation=True`.

Args:
dataframe (Union[List, pd.DataFrame]): A pandas dataframe in which each row represents
a record to be classified. All template variable names must appear as column
names in the dataframe (extra columns unrelated to the template are permitted).

template (Union[ClassificationTemplate, PromptTemplate, str]): The prompt template
as either an instance of PromptTemplate, ClassificationTemplate or a string.
If a string, the variable names should be surrounded by curly braces so that
a call to `.format` can be made to substitute variable values.

model (BaseEvalModel): An LLM model class.

rails (List[str]): A list of strings representing the possible output classes
of the model's predictions.

data_fetcher (Optional[Callable[[str], str]]): A callable which transforms an online cloud
storage audio url to a bytestring passable to an LLM.

system_instruction (Optional[str], optional): An optional system message.

verbose (bool, optional): If True, prints detailed info to stdout such as
model invocation parameters and details about retries and snapping to rails.
Default False.

use_function_calling_if_available (bool, default=True): If True, use function
calling (if available) as a means to constrain the LLM outputs.
With function calling, the LLM is instructed to provide its response as a
structured JSON object, which is easier to parse.

provide_explanation (bool, default=False): If True, provides an explanation
for each classification label. A column named `explanation` is added to
the output dataframe.

include_prompt (bool, default=False): If True, includes a column named `prompt`
in the output dataframe containing the prompt used for each classification.

include_response (bool, default=False): If True, includes a column named `response`
in the output dataframe containing the raw response from the LLM.

max_retries (int, optional): The maximum number of times to retry on exceptions.
Defaults to 10.

exit_on_error (bool, default=True): If True, stops processing evals after all retries
are exhausted on a single eval attempt. If False, all evals are attempted before
returning, even if some fail.

run_sync (bool, default=False): If True, forces synchronous request submission.
Otherwise evaluations will be run asynchronously if possible.

concurrency (Optional[int], default=None): The number of concurrent evals if async
submission is possible. If not provided, a recommended default concurrency is
set on a per-model basis.

progress_bar_format(Optional[str]): An optional format for progress bar shown. If not
specified, defaults to: llm_classify |{bar}| {n_fmt}/{total_fmt} ({percentage:3.1f}%) "
"| ⏳ {elapsed}<{remaining} | {rate_fmt}{postfix}". If 'None' is passed in specifically,
the progress_bar log will be disabled.

Returns:
pandas.DataFrame: A dataframe where the `label` column (at column position 0) contains
the classification labels. If provide_explanation=True, then an additional column named
`explanation` is added to contain the explanation for each label. The dataframe has
the same length and index as the input dataframe. The classification label values are
from the entries in the rails argument or "NOT_PARSABLE" if the model's output could
not be parsed. The output dataframe also includes three additional columns in the
output dataframe: `exceptions`, `execution_status`, and `execution_seconds` containing
details about execution errors that may have occurred during the classification as well
as the total runtime of each classification (in seconds).
"""
# Transform the audio urls to Audio objects:
# processed_data = None
# if isinstance(dataframe, pd.DataFrame):
# # processed_data = dataframe['audio_url'].apply(data_fetcher).tolist()
# processed_data = dataframe['audio_url'].apply(lambda url: data_fetcher(url).data).tolist()
# processed_format = dataframe['audio_url'].apply(lambda url: data_fetcher(url).format.value).to_list()
# elif isinstance(dataframe, list):
# processed_data = [data_fetcher(url).data for url in dataframe]

#audio_byte_series = pd.Series({'audio_bytes': processed_data, 'audio_format': processed_format})

concurrency = concurrency or model.default_concurrency
# clients need to be reloaded to ensure that async evals work properly
model.reload_client()

use_openai_function_call = (
use_function_calling_if_available
and isinstance(model, OpenAIModel)
and model.supports_function_calling
)

model_kwargs = (
openai_function_call_kwargs(rails, provide_explanation) if use_openai_function_call else {}
)

eval_template = normalize_classification_template(rails=rails, template=template)

prompt_options = PromptOptions(provide_explanation=provide_explanation)

labels: Iterable[Optional[str]] = [None] * len(dataframe)
explanations: Iterable[Optional[str]] = [None] * len(dataframe)

printif(verbose, f"Using prompt:\n\n{eval_template.prompt(prompt_options)}")
if generation_info := model.verbose_generation_info():
printif(verbose, generation_info)

def _map_template(data: pd.Series[Any]) -> MultimodalPrompt:
try:
variables = {var: data[var] for var in eval_template.variables}
empty_keys = [k for k, v in variables.items() if v is None]
if empty_keys:
raise PhoenixTemplateMappingError(
f"Missing template variables: {', '.join(empty_keys)}"
)
return eval_template.format(
variable_values=variables,
options=prompt_options,
)
except KeyError as exc:
raise PhoenixTemplateMappingError(f"Missing template variable: {exc}")

def _process_response(response: str) -> Tuple[str, Optional[str]]:
if not use_openai_function_call:
if provide_explanation:
unrailed_label, explanation = (
eval_template.extract_label_from_explanation(response),
response,
)
printif(
verbose and unrailed_label == NOT_PARSABLE,
f"- Could not parse {repr(response)}",
)
else:
unrailed_label = response
explanation = None
else:
unrailed_label, explanation = parse_openai_function_call(response)
return snap_to_rail(unrailed_label, rails, verbose=verbose), explanation

async def _run_llm_classification_async(input_data: pd.Series[Any]) -> ParsedLLMResponse:
with set_verbosity(model, verbose) as verbose_model:
prompt = _map_template(input_data)
response = await verbose_model._async_generate(
prompt, data_fetcher, instruction=system_instruction, **model_kwargs
)
inference, explanation = _process_response(response)
return inference, explanation, response, str(prompt)

def _run_llm_classification_sync(input_data: pd.Series[Any]) -> ParsedLLMResponse:
with set_verbosity(model, verbose) as verbose_model:
prompt = _map_template(input_data)
response = verbose_model._generate(
prompt, instruction=system_instruction, **model_kwargs
)
inference, explanation = _process_response(response)
return inference, explanation, response, str(prompt)

fallback_return_value: ParsedLLMResponse = (None, None, "", "")

executor = get_executor_on_sync_context(
_run_llm_classification_sync,
_run_llm_classification_async,
run_sync=run_sync,
concurrency=concurrency,
tqdm_bar_format=progress_bar_format,
max_retries=max_retries,
exit_on_error=exit_on_error,
fallback_return_value=fallback_return_value,
)

results, execution_details = executor.run([row_tuple[1] for row_tuple in dataframe.iterrows()])
labels, explanations, responses, prompts = zip(*results)
all_exceptions = [details.exceptions for details in execution_details]
execution_statuses = [details.status for details in execution_details]
execution_times = [details.execution_seconds for details in execution_details]
classification_statuses = []
for exceptions, status in zip(all_exceptions, execution_statuses):
if exceptions and isinstance(exceptions[-1], PhoenixTemplateMappingError):
classification_statuses.append(ClassificationStatus.MISSING_INPUT)
else:
classification_statuses.append(ClassificationStatus(status.value))

return pd.DataFrame(
data={
"label": labels,
**({"explanation": explanations} if provide_explanation else {}),
**({"prompt": prompts} if include_prompt else {}),
**({"response": responses} if include_response else {}),
**({"exceptions": [[repr(exc) for exc in excs] for excs in all_exceptions]}),
**({"execution_status": [status.value for status in classification_statuses]}),
**({"execution_seconds": [runtime for runtime in execution_times]}),
},
index=dataframe.index,
)


class RunEvalsPayload(NamedTuple):
evaluator: LLMEvaluator
record: Record
Expand Down
42 changes: 32 additions & 10 deletions packages/phoenix-evals/src/phoenix/evals/models/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from phoenix.evals.models.base import BaseModel
from phoenix.evals.models.rate_limiters import RateLimiter
from phoenix.evals.templates import MultimodalPrompt, PromptPartContentType
from phoenix.evals.utils import AudioFormat, Audio

MINIMUM_OPENAI_VERSION = "1.0.0"
MODEL_TOKEN_LIMIT_MAPPING = {
Expand Down Expand Up @@ -279,27 +280,43 @@ def _init_rate_limiter(self) -> None:
)

def _build_messages(
self, prompt: MultimodalPrompt, system_instruction: Optional[str] = None
self, prompt: MultimodalPrompt, data_fetcher: Optional[Callable[[str], Audio]] = None, system_instruction: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the type annotation on the callable does not match the docstring, it's best if users return a familiar type instead of an internal one I think

) -> List[Dict[str, str]]:
messages = []
for parts in prompt.parts:
if parts.content_type == PromptPartContentType.TEXT:
messages.append({"role": "system", "content": parts.content})
for part in prompt.parts:
if part.content_type == PromptPartContentType.TEXT:
messages.append({"role": "system", "content": part.content})
elif part.content_type == PromptPartContentType.AUDIO:
audio_object = data_fetcher(part.content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't super like the fact that we've plumbed an arbitrary callable down to this level, maybe I'm overthinking it

messages.append(
{
"role": "user",
"content": [
{
"type": "input_audio",
"input_audio": {
"data": audio_object.data,
"format": audio_object.format.value
}
}
],
}
)
else:
raise ValueError(f"Unsupported content type: {parts.content_type}")
raise ValueError(f"Unsupported content type: {part.content_type}")
if system_instruction:
messages.insert(0, {"role": "system", "content": str(system_instruction)})
return messages

def verbose_generation_info(self) -> str:
return f"OpenAI invocation parameters: {self.public_invocation_params}"

async def _async_generate(self, prompt: Union[str, MultimodalPrompt], **kwargs: Any) -> str:
async def _async_generate(self, prompt: Union[str, MultimodalPrompt], data_fetcher: Optional[Callable[[str], Audio]] = None, **kwargs: Any) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably not a good idea not to change the interface for _async_generate as it's used widely across Phoenix—a kwarg shouldn't drastically change things though, so let me think about this.

if isinstance(prompt, str):
prompt = MultimodalPrompt.from_string(prompt)

invoke_params = self.invocation_params
messages = self._build_messages(prompt, kwargs.get("instruction"))
messages = self._build_messages(prompt, data_fetcher, kwargs.get("instruction"))
if functions := kwargs.get("functions"):
invoke_params["functions"] = functions
if function_call := kwargs.get("function_call"):
Expand All @@ -316,12 +333,16 @@ async def _async_generate(self, prompt: Union[str, MultimodalPrompt], **kwargs:
return str(function_call.get("arguments") or "")
return str(message["content"])

def _generate(self, prompt: Union[str, MultimodalPrompt], **kwargs: Any) -> str:
def _generate(self, prompt: Union[str, MultimodalPrompt], data_fetcher: Optional[Callable[[str], Audio]] = None, **kwargs: Any) -> str:
if isinstance(prompt, str):
prompt = MultimodalPrompt.from_string(prompt)

invoke_params = self.invocation_params
messages = self._build_messages(prompt, kwargs.get("instruction"))
messages = self._build_messages(
prompt=prompt,
data_fetcher=data_fetcher,
system_instruction=kwargs.get("instruction")
)
if functions := kwargs.get("functions"):
invoke_params["functions"] = functions
if function_call := kwargs.get("function_call"):
Expand Down Expand Up @@ -362,7 +383,7 @@ async def _async_completion(**kwargs: Any) -> Any:

return await _async_completion(**kwargs)

def _rate_limited_completion(self, **kwargs: Any) -> Any:
def _rate_limited_completion(self, audio_format: str, **kwargs: Any) -> Any:
@self._rate_limiter.limit
def _completion(**kwargs: Any) -> Any:
try:
Expand All @@ -374,6 +395,7 @@ def _completion(**kwargs: Any) -> Any:
)
# OpenAI 1.0.0 API responses are pydantic objects, not dicts
# We must dump the model to get the dict
# TODO consider if there are additional changes + consider 'modalities' parameter
return self._client.completions.create(**kwargs).model_dump()
return self._client.chat.completions.create(**kwargs).model_dump()
except self._openai._exceptions.BadRequestError as e:
Expand Down
6 changes: 5 additions & 1 deletion packages/phoenix-evals/src/phoenix/evals/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def get_field(self, field_name: str, args: Sequence[Any], kwargs: Mapping[str, A

class PromptPartContentType(str, Enum):
TEXT = "text"
AUDIO_URL = "audio_url"
# AUDIO_BYTES = "audio_bytes"
# AUDIO_FORMAT = "audio_format"
AUDIO = "audio"


@dataclass
Expand All @@ -40,6 +42,7 @@ class PromptPart:
content: str


# TODO: ask about rename to PromptTemplatePart
@dataclass
class PromptPartTemplate:
content_type: PromptPartContentType
Expand Down Expand Up @@ -125,6 +128,7 @@ def __init__(
self,
rails: List[str],
template: Union[str, List[PromptPartTemplate]],
# TODO: ask about this being optional but is called in line 145
explanation_template: Optional[Union[str, List[PromptPartTemplate]]] = None,
explanation_label_parser: Optional[Callable[[str], str]] = None,
delimiters: Tuple[str, str] = (DEFAULT_START_DELIM, DEFAULT_END_DELIM),
Expand Down
Loading
Loading