Skip to content

Commit

Permalink
fix: sync convert to parsers (#186)
Browse files Browse the repository at this point in the history
* add: sync convert to parsers

* add: tests
  • Loading branch information
chloedia authored Dec 16, 2024
1 parent 8c18861 commit fbb7d36
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 41 deletions.
2 changes: 1 addition & 1 deletion libs/megaparse/src/megaparse/megaparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async def aload(
try:
parser = self._select_parser(file_path, file, file_extension)
logger.info(f"Parsing using {parser.__class__.__name__} parser.")
parsed_document = await parser.convert(
parsed_document = await parser.aconvert(
file_path=file_path, file=file, file_extension=file_extension
)
# @chloe FIXME: format_checker needs unstructured Elements as input which is to change
Expand Down
25 changes: 24 additions & 1 deletion libs/megaparse/src/megaparse/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,30 @@ class BaseParser(ABC):
"""Mother Class for all the parsers [Unstructured, LlamaParse, MegaParseVision]"""

@abstractmethod
async def convert(
async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: FileExtension | None = None,
**kwargs,
) -> str:
"""
Convert the given file to a specific format.
Args:
file_path (str | Path): The path to the file to be converted.
**kwargs: Additional keyword arguments for the conversion process.
Returns:
str: The result of the conversion process.
Raises:
NotImplementedError: If the method is not implemented by a subclass.
"""
raise NotImplementedError("Subclasses should implement this method")

@abstractmethod
def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
Expand Down
19 changes: 17 additions & 2 deletions libs/megaparse/src/megaparse/parser/doctr_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from pathlib import Path
from typing import IO, BinaryIO, List

Expand Down Expand Up @@ -59,11 +60,11 @@ def _get_providers(self) -> List[str]:
else:
return ["CPUExecutionProvider"]

async def convert(
def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | BinaryIO | None = None,
file_extension: str | FileExtension = "",
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
if file:
Expand All @@ -77,3 +78,17 @@ async def convert(
# Analyze
result = self.predictor(doc)
return result.render()

async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | BinaryIO | None = None,
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
warnings.warn(
"The UnstructuredParser is a sync parser, please use the sync convert method",
UserWarning,
stacklevel=2,
)
return self.convert(file_path, file, file_extension, **kwargs)
32 changes: 30 additions & 2 deletions libs/megaparse/src/megaparse/parser/llama.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from pathlib import Path
from typing import IO, List

Expand Down Expand Up @@ -27,11 +28,11 @@ def __init__(
self.parsing_instruction = """Do not take into account the page breaks (no --- between pages),
do not repeat the header and the footer so the tables are merged if needed. Keep the same format for similar tables."""

async def convert(
async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: str | FileExtension = "",
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
if not file_path:
Expand All @@ -53,3 +54,30 @@ async def convert(
parsed_md = parsed_md + text_content

return parsed_md

def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
if not file_path:
raise ValueError("File_path should be provided to run LlamaParser")

llama_parser = _LlamaParse(
api_key=self.api_key,
result_type=ResultType.MD,
gpt4o_mode=True,
verbose=self.verbose,
language=self.language,
parsing_instruction=self.parsing_instruction,
)

documents: List[LlamaDocument] = llama_parser.load_data(str(file_path))
parsed_md = ""
for document in documents:
text_content = document.text
parsed_md = parsed_md + text_content

return parsed_md
66 changes: 61 additions & 5 deletions libs/megaparse/src/megaparse/parser/megaparse_vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import re
from io import BytesIO
from pathlib import Path
from typing import IO, List
from typing import IO, List, Union

from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import HumanMessage
from megaparse_sdk.schema.extensions import FileExtension
from pdf2image import convert_from_path

from megaparse.parser import BaseParser
Expand Down Expand Up @@ -91,7 +92,7 @@ def get_element(self, tag: TagEnum, chunk: str):
return []
return [elmt.strip() for elmt in all_elmts]

async def send_to_mlm(self, images_data: List[str]) -> str:
async def asend_to_mlm(self, images_data: List[str]) -> str:
"""
Send images to the language model for processing.
Expand All @@ -114,11 +115,34 @@ async def send_to_mlm(self, images_data: List[str]) -> str:
response = await self.model.ainvoke([message])
return str(response.content)

async def convert(
def send_to_mlm(self, images_data: List[str]) -> str:
"""
Send images to the language model for processing.
:param images_data: List of base64 encoded images
:return: Processed content as a string
"""
images_prompt = [
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
}
for image_data in images_data
]
message = HumanMessage(
content=[
{"type": "text", "text": BASE_OCR_PROMPT},
*images_prompt,
],
)
response = self.model.invoke([message])
return str(response.content)

async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: str = "",
file_extension: FileExtension | None = None,
batch_size: int = 3,
**kwargs,
) -> str:
Expand All @@ -136,13 +160,45 @@ async def convert(
file_path = str(file_path)
pdf_base64 = self.process_file(file_path)
tasks = [
self.send_to_mlm(pdf_base64[i : i + batch_size])
self.asend_to_mlm(pdf_base64[i : i + batch_size])
for i in range(0, len(pdf_base64), batch_size)
]
self.parsed_chunks = await asyncio.gather(*tasks)
responses = self.get_cleaned_content("\n".join(self.parsed_chunks))
return responses

def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: FileExtension | None = None,
batch_size: int = 3,
**kwargs,
) -> str:
"""
Parse a PDF file and process its content using the language model.
:param file_path: Path to the PDF file
:param batch_size: Number of pages to process at a time
:return: List of processed content strings
"""
if not file_path:
raise ValueError("File_path should be provided to run MegaParseVision")

if isinstance(file_path, Path):
file_path = str(file_path)
pdf_base64 = self.process_file(file_path)
chunks = [
pdf_base64[i : i + batch_size]
for i in range(0, len(pdf_base64), batch_size)
]
self.parsed_chunks = []
for chunk in chunks:
response = self.send_to_mlm(chunk)
self.parsed_chunks.append(response)
responses = self.get_cleaned_content("\n".join(self.parsed_chunks))
return responses

def get_cleaned_content(self, parsed_file: str) -> str:
"""
Get cleaned parsed file without any tags defined in TagEnum.
Expand Down
18 changes: 16 additions & 2 deletions libs/megaparse/src/megaparse/parser/unstructured_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import warnings
from pathlib import Path
from typing import IO

Expand Down Expand Up @@ -99,14 +100,27 @@ def get_markdown_line(self, el: dict):

return markdown_line

async def convert(
async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: FileExtension | None = None,
**kwargs,
) -> str:
warnings.warn(
"The UnstructuredParser is a sync parser, please use the sync convert method",
UserWarning,
stacklevel=2,
)
return self.convert(file_path, file, file_extension, **kwargs)

def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: FileExtension | None = None,
**kwargs,
) -> str:
# Partition the PDF
elements = partition(
filename=str(file_path) if file_path else None,
file=file,
Expand Down
14 changes: 12 additions & 2 deletions libs/megaparse/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ def build(self, *args, **kwargs) -> BaseParser:
"""

class FakeParser(BaseParser):
async def convert(
def convert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: str | FileExtension = "",
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
print("Fake parser is converting the file")
return "Fake conversion result"

async def aconvert(
self,
file_path: str | Path | None = None,
file: IO[bytes] | None = None,
file_extension: None | FileExtension = None,
**kwargs,
) -> str:
print("Fake parser is converting the file")
Expand Down
31 changes: 31 additions & 0 deletions libs/megaparse/tests/pdf/test_all_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest
from megaparse import MegaParse
from megaparse.parser.doctr_parser import DoctrParser
from megaparse.parser.llama import LlamaParser
from megaparse.parser.megaparse_vision import MegaParseVision
from megaparse.parser.unstructured_parser import UnstructuredParser

PARSER_LIST = [
UnstructuredParser,
DoctrParser,
] # LlamaParser, MegaParseVision are long and costly to test


@pytest.mark.parametrize("parser", PARSER_LIST)
def test_sync_parsers(parser):
parser = parser()
megaparse = MegaParse(parser)
response = megaparse.load("./tests/data/dummy.pdf")
print(response)
assert response
assert len(response) > 0


@pytest.mark.asyncio
@pytest.mark.parametrize("parser", PARSER_LIST)
async def test_async_parsers(parser):
parser = parser()
megaparse = MegaParse(parser)
response = await megaparse.aload("./tests/data/dummy.pdf")
print(response)
assert len(response) > 0
2 changes: 1 addition & 1 deletion libs/megaparse/tests/pdf/test_detect_ocr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def test_strategy_all():
pdf = "/Users/amine/Downloads/RAG Corporate 2024 016.pdf"
pdf = "./tests/pdf/sample_pdf.pdf"
strategy = determine_strategy(
pdf, threshold_pages_ocr=0.2, threshold_image_page=0.3
)
Expand Down
13 changes: 0 additions & 13 deletions libs/megaparse/tests/pdf/test_doctr_parser.py

This file was deleted.

12 changes: 0 additions & 12 deletions libs/megaparse/tests/pdf/test_unstructured_parser.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
from pathlib import Path

import pypdfium2 as pdfium
import pytest
from megaparse.parser.unstructured_parser import UnstructuredParser
from megaparse_sdk.schema.parser_config import StrategyEnum


@pytest.mark.asyncio
async def test_unstructured_parser():
# scanned pdf
p = Path("./tests/pdf/mlbook.pdf")
processor = UnstructuredParser(strategy=StrategyEnum.FAST)
result = await processor.convert(file_path=p)
assert len(result) > 0


def test_pdfium():
Expand Down

0 comments on commit fbb7d36

Please sign in to comment.