Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overview query breakout #89

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,63 @@ def search_overview_via_payload(collection: Collection, req: Request, payload: Q
return ES.search_overview(collection.name, payload.q)


@v1.get("/{collection}/search/daily_counts", tags=["data"])
@v1.head("/{collection}/search/daily_counts", include_in_schema=False)
def search_daily_counts(collection: Collection, q: str, req: Request):
"""
Report overview summary of the search result
"""
return ES.daily_counts(collection.name, q)


@v1.post("/{collection}/search/daily_counts", tags=["data"])
def search_daily_counts_via_payload(
collection: Collection, req: Request, payload: Query
):
"""
Report summary of the search result
"""
return ES.daily_counts(collection.name, payload.q)


@v1.get("/{collection}/search/top_languages", tags=["data"])
@v1.head("/{collection}/search/top_languages", include_in_schema=False)
def search_top_languages(collection: Collection, q: str, req: Request):
"""
Report overview summary of the search result
"""
return ES.top_languages(collection.name, q)


@v1.post("/{collection}/search/top_languages", tags=["data"])
def search_top_languages_via_payload(
collection: Collection, req: Request, payload: Query
):
"""
Report summary of the search result
"""
return ES.top_languages(collection.name, payload.q)


@v1.get("/{collection}/search/top_domains", tags=["data"])
@v1.head("/{collection}/search/top_domains", include_in_schema=False)
def search_top_domains(collection: Collection, q: str, req: Request):
"""
Report overview summary of the search result
"""
return ES.top_domains(collection.name, q)


@v1.post("/{collection}/search/top_domains", tags=["data"])
def search_top_domains_via_payload(
collection: Collection, req: Request, payload: Query
):
"""
Report summary of the search result
"""
return ES.top_domains(collection.name, payload.q)


@v1.get("/{collection}/search/result", tags=["data"])
@v1.head("/{collection}/search/result", include_in_schema=False)
def search_result_via_query_params(
Expand Down
120 changes: 90 additions & 30 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ class QueryBuilder:
VALID_SORT_ORDERS = ["asc", "desc"]
VALID_SORT_FIELDS = ["publication_date", "indexed_date"]

class Aggregators(Enum):
DAILY_COUNTS = {
"dailycounts": {
"date_histogram": {
"field": "publication_date",
"calendar_interval": "day",
"min_doc_count": 1,
}
}
}
TOP_LANGS = {"toplangs": {"terms": {"field": "language.keyword", "size": 100}}}
TOP_DOMAINS = {
"topdomains": {"terms": {"field": "canonical_domain.keyword", "size": 100}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field changing from canonical_domain to canonical_domain.keyword?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! In trying to benchmark the latency of these operations I noticed that aggregating on the .keywords field of canonical domain made the query return significantly faster- ultimately it made the overview query almost twice as fast.

}

def __init__(self, query_text):
self.query_text = query_text
self._source = [
Expand Down Expand Up @@ -95,28 +110,17 @@ def basic_query(self, expanded: bool = False) -> Dict:
}
return default

def overview_query(self):
def aggregator_query(self, *aggs: "QueryBuilder.Aggregators") -> Dict:
query = self.basic_query()
query.update(
{
"aggregations": {
"daily": {
"date_histogram": {
"field": "publication_date",
"calendar_interval": "day",
"min_doc_count": 1,
}
},
"lang": {"terms": {"field": "language.keyword", "size": 100}},
"domain": {"terms": {"field": "canonical_domain", "size": 100}},
"tld": {"terms": {"field": "tld", "size": 100}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we dropping tld?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the other layers above this point in the API use TLD- we had been calculating it here, I think, so that we had a backup for when the total count was above the 10,000 document limit imposed by elasticsearch- and we can replace that function with the domain field- so I removed it here in the spirit of speeding up the query! But, happy to consider leaving it in- @rahulbot any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't think of a researc huse of TLD in our current practices. I'd support removal.

Copy link
Member Author

@pgulley pgulley Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to put this snippet of a conversation I had with @ibnesayeed many months ago here for context-

Basically, we had an aggregation on TLDs, so it would report how many matching documents are there for each TLD, which we can sum up to get the better total count. While we had aggregation on other aspects, such as language, date, etc., which I could have used for summation, but I chose TLD for two reasons: 1) it is finite in number, so there are very few entries to add and will never overflow the usual search result limit, and 2) almost all documents will have TLDs associated, unlike language or publication date, which might be empty, hence some documents would go uncounted.

I think this property of TLDs, where it is always filled, is also true of canonical_domain- so we can drop that in instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this backup in the most recent commit

},
"aggregations": {k: v for agg in aggs for k, v in agg.value.items()},
"track_total_hits": True,
}
)
return query

def terms_query(self, field):
def terms_query(self, field) -> Dict:
resct = 200
aggr_map = {
"terms": {
Expand Down Expand Up @@ -170,7 +174,7 @@ def paged_query(
query["search_after"] = [decode_key(resume)]
return query

def article_query(self):
def article_query(self) -> Dict:
default: dict = {
"_source": self._expanded_source,
"query": {"match": {"_id": self.query_text}},
Expand Down Expand Up @@ -249,30 +253,86 @@ def format_day_counts(self, bucket: list):
def format_counts(self, bucket: list):
return {item["key"]: item["doc_count"] for item in bucket}

def search_overview(self, collection: str, q: str):
def aggregator_query(
self, collection: str, q: str, *aggs: QueryBuilder.Aggregators, **options
):
"""
Get overview statistics for a query
Abstraction to DRY out permutations of the 'overview' query getting broken out into their own calls
"""
res = self.ES.search(index=collection, body=QueryBuilder(q).overview_query()) # type: ignore [call-arg]
query_body = QueryBuilder(q).aggregator_query(*aggs)

res = self.ES.search(index=collection, body=query_body) # type: ignore [call-arg]
if not res["hits"]["hits"]:
raise HTTPException(status_code=404, detail="No results found!")

total = res["hits"]["total"]["value"]
tldsum = sum(
item["doc_count"] for item in res["aggregations"]["tld"]["buckets"]
)
return {
return_dict = {
"query": q,
"total": max(total, tldsum),
"topdomains": self.format_counts(res["aggregations"]["domain"]["buckets"]),
"toptlds": self.format_counts(res["aggregations"]["tld"]["buckets"]),
"toplangs": self.format_counts(res["aggregations"]["lang"]["buckets"]),
"dailycounts": self.format_day_counts(
res["aggregations"]["daily"]["buckets"]
),
"matches": [self.format_match(h, collection) for h in res["hits"]["hits"]],
}

# Add the results of each aggregator to the return value
for agg in aggs:
agg_name = next(iter(agg.value.keys()))
pgulley marked this conversation as resolved.
Show resolved Hide resolved
return_dict.update(
{agg_name: self.format_counts(res["aggregations"][agg_name]["buckets"])}
)

# Only return the total and matches if explicitly requested
if "overview" in options:
return_dict.update(
{
"total": total,
"matches": [ # type: ignore [dict-item]
self.format_match(h, collection) for h in res["hits"]["hits"]
],
}
)

return return_dict

def search_overview(self, collection: str, q: str):
"""
Get overview statistics for a query
"""
return self.aggregator_query(
collection,
q,
QueryBuilder.Aggregators.DAILY_COUNTS,
QueryBuilder.Aggregators.TOP_LANGS,
QueryBuilder.Aggregators.TOP_DOMAINS,
overview=True,
)

def daily_counts(self, collection: str, q: str):
"""
Return just a daily count histogram for a query
"""
return self.aggregator_query(
collection,
q,
QueryBuilder.Aggregators.DAILY_COUNTS,
)

def top_languages(self, collection: str, q: str):
"""
Return top languagues for a query
"""
return self.aggregator_query(
collection,
q,
QueryBuilder.Aggregators.TOP_LANGS,
)

def top_domains(self, collection: str, q: str):
"""
Return top domains for a query
"""
return self.aggregator_query(
collection,
q,
QueryBuilder.Aggregators.TOP_DOMAINS,
)

def search_result(
self,
collection: str,
Expand Down
33 changes: 33 additions & 0 deletions test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,36 @@ def test_top_terms(self):
results = response.json()
assert response.status_code == 200
assert len(results) > 0

def test_daily_counts(self):
response = self._client.post(
f"/v1/{INDEX_NAME}/search/daily_counts",
json={"q": "mediacloud"},
timeout=TIMEOUT,
)

results = response.json()
assert response.status_code == 200
assert "dailycounts" in results

def test_top_languages(self):
response = self._client.post(
f"/v1/{INDEX_NAME}/search/top_languages",
json={"q": "mediacloud"},
timeout=TIMEOUT,
)

results = response.json()
assert response.status_code == 200
assert "toplangs" in results

def test_top_domains(self):
response = self._client.post(
f"/v1/{INDEX_NAME}/search/top_domains",
json={"q": "mediacloud"},
timeout=TIMEOUT,
)

results = response.json()
assert response.status_code == 200
assert "topdomains" in results