Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Oct 17, 2024
1 parent 2ad435a commit 9e70304
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 40 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class VersionStatus(Enum):

_major = 0
_minor = 18
_revision = 0
_revision = 1
_status = VersionStatus.RELEASE

__author__ = "@joocer"
Expand Down
43 changes: 5 additions & 38 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import aiohttp
import pyarrow
import pyarrow.parquet
from orso.schema import RelationSchema
from orso.schema import convert_orso_schema_to_arrow_schema

from opteryx import config
Expand All @@ -40,47 +39,13 @@
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder

from .read_node import normalize_morsel
from .read_node import struct_to_jsonb

CONCURRENT_READS = config.CONCURRENT_READS
MAX_READ_BUFFER_CAPACITY = config.MAX_READ_BUFFER_CAPACITY


def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.Table:
if len(schema.columns) == 0 and morsel.column_names != ["*"]:
one_column = pyarrow.array([True] * morsel.num_rows, type=pyarrow.bool_())
morsel = morsel.append_column("*", one_column)
return morsel.select(["*"])

# rename columns for internal use
target_column_names = []
# columns in the data but not in the schema, droppable
droppable_columns = []

# find which columns to drop and which columns we already have
for i, column in enumerate(morsel.column_names):
column_name = schema.find_column(column)
if column_name is None:
droppable_columns.append(i)
else:
target_column_names.append(str(column_name))

# remove from the end otherwise we'll remove the wrong columns after the first one
droppable_columns.reverse()
for droppable in droppable_columns:
morsel = morsel.remove_column(droppable)

# remane columns to the internal names (identities)
morsel = morsel.rename_columns(target_column_names)

# add columns we don't have
for column in schema.columns:
if column.identity not in target_column_names:
null_column = pyarrow.array([None] * morsel.num_rows)
morsel = morsel.append_column(column.identity, null_column)

# ensure the columns are in the right order
return morsel.select([col.identity for col in schema.columns])


async def fetch_data(blob_names, pool, reader, reply_queue, statistics):
semaphore = asyncio.Semaphore(CONCURRENT_READS)
session = aiohttp.ClientSession()
Expand Down Expand Up @@ -212,7 +177,9 @@ def execute(self) -> Generator:
num_rows, _, morsel = decoded
self.statistics.rows_seen += num_rows

morsel = struct_to_jsonb(morsel)
morsel = normalize_morsel(orso_schema, morsel)

if arrow_schema:
morsel = morsel.cast(arrow_schema)
else:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.T
# add columns we don't have, populate with nulls but try to get the correct type
for column in schema.columns:
if column.identity not in target_column_names:
null_column = pyarrow.array([None] * morsel.num_rows)
null_column = pyarrow.array([None] * morsel.num_rows, type=column.arrow_field.type)
field = pyarrow.field(name=column.identity, type=column.arrow_field.type)
morsel = morsel.append_column(field, null_column)

Expand Down

0 comments on commit 9e70304

Please sign in to comment.