Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Sep 13, 2024
1 parent fb43b12 commit 47dcdeb
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
22 changes: 20 additions & 2 deletions opteryx/connectors/gcp_firestore_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class GcpFireStoreConnector(BaseConnector, PredicatePushable):
__mode__ = "Collection"
__type__ = "FIRESTORE"

PUSHABLE_OPS: Dict[str, bool] = {"Eq": True}
PUSHABLE_OPS: Dict[str, bool] = {"Eq": True, "NotEq": True}

OPS_XLAT: Dict[str, str] = {"Eq": "==", "NotEq": "!="}

PUSHABLE_TYPES = {OrsoTypes.BOOLEAN, OrsoTypes.DOUBLE, OrsoTypes.INTEGER, OrsoTypes.VARCHAR}

Expand All @@ -89,13 +91,27 @@ def read_dataset(
"""
from google.cloud.firestore_v1.base_query import FieldFilter

from opteryx.utils.file_decoders import filter_records

database = _initialize()
documents = database.collection(self.dataset)

collected_predicates = []
have_pushed_a_negative = False

if predicates:
for predicate in predicates:
if have_pushed_a_negative and predicate.value == "NotEq":
collected_predicates.append(predicate)
continue
if predicate.value == "NotEq":
have_pushed_a_negative = True
documents = documents.where(
filter=FieldFilter(predicate.left.source_column, "==", predicate.right.value)
filter=FieldFilter(
predicate.left.source_column,
self.OPS_XLAT[predicate.value],
predicate.right.value,
)
)

documents = documents.stream()
Expand All @@ -105,6 +121,8 @@ def read_dataset(
columns=columns,
initial_chunk_size=chunk_size,
):
if collected_predicates:
morsel = filter_records(collected_predicates, morsel)
yield morsel

def get_dataset_schema(self) -> RelationSchema:
Expand Down
5 changes: 5 additions & 0 deletions opteryx/operators/read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def config(self):

def execute(self) -> Generator:
"""Perform this step, time how long is spent doing work"""

self.statistics.blobs_read += 0
self.statistics.rows_read += 0
self.statistics.bytes_processed += 0

morsel = None
orso_schema = self.schema
orso_schema_cols = []
Expand Down
48 changes: 47 additions & 1 deletion tests/storage/test_collection_gcs_firestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,53 @@ def test_predicate_pushdown_not_equals():
cur = conn.cursor()
cur.execute("SELECT * FROM dwarves WHERE actor != 'Pinto Colvig';")
assert cur.rowcount == 5, cur.rowcount
assert cur.stats["rows_read"] == 7, cur.stats
assert cur.stats["rows_read"] == 5, cur.stats


def test_predicate_pushdown_multiple_not_equals():
"""we don't push these, we get 5 records by Opteryx does the filtering not the source"""
opteryx.register_store("dwarves", GcpFireStoreConnector)
os.environ["GCP_PROJECT_ID"] = "mabeldev"

conn = opteryx.connect()

# TEST PREDICATE PUSHDOWN
cur = conn.cursor()
cur.execute("SELECT * FROM dwarves WHERE actor != 'Pinto Colvig' and actor != 'Sleepy';")
assert cur.rowcount == 5, cur.rowcount
assert cur.stats["rows_read"] == 5, cur.stats

cur = conn.cursor()
cur.execute("SELECT * FROM dwarves WHERE actor != 'Pinto Colvig' and name != 'Sneezy';")
assert cur.rowcount == 4, cur.rowcount
assert cur.stats["rows_read"] == 4, cur.stats

def test_predicate_pushdown_multiple_equals():
"""we don't push these, we get 5 records by Opteryx does the filtering not the source"""
opteryx.register_store("dwarves", GcpFireStoreConnector)
os.environ["GCP_PROJECT_ID"] = "mabeldev"

conn = opteryx.connect()

# TEST PREDICATE PUSHDOWN
cur = conn.cursor()
cur.execute("SELECT * FROM dwarves WHERE actor == 'Pinto Colvig' and actor == 'Sleepy';")
assert cur.rowcount == 0, cur.rowcount
assert cur.stats["rows_read"] == 0, cur.stats

def test_predicate_pushdown_multiple_mixed():
"""we don't push these, we get 5 records by Opteryx does the filtering not the source"""
opteryx.register_store("dwarves", GcpFireStoreConnector)
os.environ["GCP_PROJECT_ID"] = "mabeldev"

conn = opteryx.connect()

# TEST PREDICATE PUSHDOWN
cur = conn.cursor()
cur.execute("SELECT * FROM dwarves WHERE actor == 'Pinto Colvig' and actor != 'Sleepy';")
assert cur.rowcount == 2, cur.rowcount
assert cur.stats["rows_read"] == 2, cur.stats



if __name__ == "__main__": # pragma: no cover
Expand Down

0 comments on commit 47dcdeb

Please sign in to comment.